diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml deleted file mode 100644 index 0ca2e92..0000000 --- a/.github/workflows/ci.yml +++ /dev/null @@ -1,35 +0,0 @@ -name: CI - -on: - schedule: - - cron: "00 19 * * *" # run ci periodically at 3 am - pull_request: - branches: [main] - -jobs: - commit: - name: Commit Message Validation - runs-on: ubuntu-latest - if: ${{ github.event_name != 'schedule' }} - steps: - - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - run: git show-ref - - uses: Swatinem/rust-cache@v2 - - uses: actions-rs/install@v0.1 - with: - crate: git-cz - version: latest - - name: Validate commit messages - run: git-cz check ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} - - spell-check: - name: Spell Check - runs-on: ubuntu-latest - steps: - - name: Checkout Actions Repository - uses: actions/checkout@v2 - - - name: Check Spelling - uses: crate-ci/typos@master diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml new file mode 100644 index 0000000..4bb5f09 --- /dev/null +++ b/.github/workflows/pr.yml @@ -0,0 +1,39 @@ +name: CI + +on: + pull_request: + branches: [ main ] + +jobs: + test: + name: Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Setup Java + uses: actions/setup-java@v3 + with: + distribution: 'zulu' + java-version: '11' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + + - name: Start the cluster + run: docker compose -f ci/docker-compose.yml up -d + + - name: Run test + run: ./gradlew test + + spell-check: + name: Spell Check + runs-on: ubuntu-latest + steps: + - name: Checkout Actions Repository + uses: actions/checkout@v2 + + - name: Check Spelling + uses: crate-ci/typos@master diff --git a/build.gradle.kts b/build.gradle.kts index ed8d2aa..9302f6f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -2,7 +2,7 @@ import com.adarshr.gradle.testlogger.TestLoggerExtension import com.adarshr.gradle.testlogger.theme.ThemeType group = "cloud.xline" -version = "1.0-SNAPSHOT" +version = "0.1.0-SNAPSHOT" buildscript { repositories { @@ -27,6 +27,7 @@ subprojects { apply(plugin = "java-library") apply(plugin = "org.gradle.test-retry") apply(plugin = "com.adarshr.test-logger") + apply(plugin = "maven-publish") tasks { named("compileJava") { @@ -47,8 +48,26 @@ subprojects { } } - extensions.getByType().apply { + configure { + publications { + create("maven") { + groupId = rootProject.group.toString() + version = rootProject.version.toString() + + from(components["java"]) + + } + } + } + + configure { + withSourcesJar() + withJavadocJar() + } + + configure { theme = ThemeType.MOCHA_PARALLEL showStandardStreams = false } + } diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml new file mode 100644 index 0000000..8c9b23b --- /dev/null +++ b/ci/docker-compose.yml @@ -0,0 +1,71 @@ +version: '3.9' + +networks: + xline_network: + driver: bridge + ipam: + driver: default + config: + - subnet: "172.18.0.0/16" + +services: + node1: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.2 + volumes: + - .:/mnt + ports: + - "2379:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node1 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem + --is-leader + + node2: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.3 + volumes: + - .:/mnt + ports: + - "2380:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node2 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem + + node3: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.4 + volumes: + - .:/mnt + ports: + - "2381:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node3 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem diff --git a/ci/private.pem b/ci/private.pem new file mode 100644 index 0000000..a888426 --- /dev/null +++ b/ci/private.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCnAxxSXJYWZCKr +6f6j0HRUwkhX/0+GXjEclWoLA5+KZAuWMSu8bz6X+IScv4vNwORlGSWOnrz+8mb2 +I0F6teVZWfWFqsnyWk7IxM+h9yTg7aY/8685YfWTL7fpWq1/3Fniz4QbsYFuzB1V +gaZ5fD2CSYIKzSD+qVSlXF25JDFHV7b2OdHrX0UKZOTWY/VE//STt+PJKdX9R3pl +kGwAzJIkkcAZy0vhvqT3ASTgXchNeN8wGYYb3YirkqIsQB5Xcs1R1W+yz+IrVa6/ +0WMcyE6qtJPZ0lviyT0nHV/pZjXuD4B0aja/1fk/HmXDPMjpK1BuCBTStM/KlcrA +oAxo+YDhAgMBAAECggEAIyJhY+Y8YMuCC753JkklH+ubQn/gX/kSxduc6mJBvuBb +G6aOd97DQT8zzrHxHEDXC3ml0AIO6mdeR6uVC9aWQBzPrOYIA+cBqfTVZVJTvMnh +7pQ6KY01F1izjPDZjQtzEWbseNL30rI3/ZP/zJDZc745EEKlDU3cE8mBogA+Ka6w +GLozT9qQf8knBrtzxH6SvrZpfaRlP95is82b4IuPhqYdG7dVYFTALE1MyVrCbS4Y +KytjNLgwp1bIQtWrzMebBGoiU+DvDcRY8zvOfFupDwpYCt3p1aU5wyYYdr74esV7 +jjqHj89Ua65JHJ3XnMAaMc4dHM2FsGqMsOv/DDKInQKBgQDawckQEekx0QuP3eJP +GWdZ87oc+FVjDe3bYhAnCf/yXRJoqcs5vr1m1yCXFfsjbQFYHWXR9AUtNn5HCwOZ +zoT1Mv96fXBVGQORgzvlUWS43uKpfIPDVv2I6ZcKSIQAGOgcWYvmBDhYqPHgmx3o +VSrNGWtLdyw3rD1J6O+1RwtbiwKBgQDDchmY59EXBiTvlyT3Qjl0vZFMHa+TElbh +ikNtYltbUHtamOXZzpdk/KA7X2dYi0QpVfbbpfP/ly5lYvgZwl8h90Obopru+ACM +ndlKBfNQYArmWY6bJ2CwF7j1aTCCHZuVuX6/pzFVStRcssn15uoVaIyKd/MhJzLF +S3ertQkSwwKBgAniMYRhWsjeaghQ/RWXzzyYL3N5oNn92h5MWvB4mjDIFbnW2hC8 +1m/cDmPlIVijZyklAuGuhcFaMfBhxgLf+s/dQv+0xSuDGs8rP7yHpeZYY6NGtelQ +d9oEu8dCKXybo3kMbq6wyB7xWyRLvdkuZ+WmXVumgb/uL0K0nIfzMscrAoGAeA1e +K845YSslBQaSbk7/e/X1iguyDWT2eRO01zvTYgPNwZipl2CPHjkPM2km0fy5oaps +N/94IUd7+EsSmsAKL5LytGbtRFyR+c376rw8+OIFz/iy4BsQCRqJQjWa1lHZf96x +PIg2hW2xhD9OTv3IS94sdeG4NmUdipMQryhEqoECgYEAkvXOg66IAVTrO6qgoyl5 +42oufa/QE+qOAYoQEpmx3SZx6tMkycfAQqUHYcXhW1HNjyGbbg/sl13yddnPQqig ++ObtQNSIqGZWCc/HIqM//pPI3MHPhWARMOmAbk0I1mT0QKhuFfSugV2xb1Dj/Rvf +0VdB8txY+5Wz6zP1F2g46gM= +-----END PRIVATE KEY----- diff --git a/ci/public.pem b/ci/public.pem new file mode 100644 index 0000000..4c52eb6 --- /dev/null +++ b/ci/public.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApwMcUlyWFmQiq+n+o9B0 +VMJIV/9Phl4xHJVqCwOfimQLljErvG8+l/iEnL+LzcDkZRkljp68/vJm9iNBerXl +WVn1harJ8lpOyMTPofck4O2mP/OvOWH1ky+36Vqtf9xZ4s+EG7GBbswdVYGmeXw9 +gkmCCs0g/qlUpVxduSQxR1e29jnR619FCmTk1mP1RP/0k7fjySnV/Ud6ZZBsAMyS +JJHAGctL4b6k9wEk4F3ITXjfMBmGG92Iq5KiLEAeV3LNUdVvss/iK1Wuv9FjHMhO +qrST2dJb4sk9Jx1f6WY17g+AdGo2v9X5Px5lwzzI6StQbggU0rTPypXKwKAMaPmA +4QIDAQAB +-----END PUBLIC KEY----- diff --git a/jxline-core/build.gradle.kts b/jxline-core/build.gradle.kts index 530f2c8..afb8fa9 100644 --- a/jxline-core/build.gradle.kts +++ b/jxline-core/build.gradle.kts @@ -3,4 +3,5 @@ dependencies { api(libs.jetcd) testImplementation(libs.bundles.testing) + testRuntimeOnly(libs.bundles.log4j) } diff --git a/jxline-core/src/main/java/cloud/xline/jxline/Auth.java b/jxline-core/src/main/java/cloud/xline/jxline/Auth.java new file mode 100644 index 0000000..9f1b5e9 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/Auth.java @@ -0,0 +1,139 @@ +package cloud.xline.jxline; + +import cloud.xline.jxline.auth.*; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.support.CloseableClient; + +import java.util.concurrent.CompletableFuture; + +public interface Auth extends CloseableClient { + /** + * enables auth of an etcd cluster. + * + * @return the response. + */ + CompletableFuture authEnable(); + + /** + * disables auth of an etcd cluster. + * + * @return the response. + */ + CompletableFuture authDisable(); + + /** + * adds a new user to an etcd cluster. + * + * @param user the user + * @param password the password + * @return the response. + */ + CompletableFuture userAdd(ByteSequence user, ByteSequence password); + + /** + * deletes a user from an etcd cluster. + * + * @param user the user + * @return the response. + */ + CompletableFuture userDelete(ByteSequence user); + + /** + * changes a password of a user. + * + * @param user the user + * @param password the password + * @return the response. + */ + CompletableFuture userChangePassword( + ByteSequence user, ByteSequence password); + + /** + * gets detailed information of a user. + * + * @param user the user + * @return the response. + */ + CompletableFuture userGet(ByteSequence user); + + /** + * gets a list of all users. + * + * @return the response. + */ + CompletableFuture userList(); + + /** + * grants a role to a user. + * + * @param user the user + * @param role the role to grant + * @return the response. + */ + CompletableFuture userGrantRole( + ByteSequence user, ByteSequence role); + + /** + * revokes a role of a user. + * + * @param user the user + * @param role the role to revoke + * @return the response. + */ + CompletableFuture userRevokeRole( + ByteSequence user, ByteSequence role); + + /** + * adds a new role to an etcd cluster. + * + * @param role the role to add + * @return the response. + */ + CompletableFuture roleAdd(ByteSequence role); + + /** + * grants a permission to a role. + * + * @param role the role + * @param key the key + * @param rangeEnd the range end + * @param permType the type + * @return the response. + */ + CompletableFuture roleGrantPermission( + ByteSequence role, ByteSequence key, ByteSequence rangeEnd, Permission.Type permType); + + /** + * gets detailed information of a role. + * + * @param role the role to get + * @return the response. + */ + CompletableFuture roleGet(ByteSequence role); + + /** + * gets a list of all roles. + * + * @return the response. + */ + CompletableFuture roleList(); + + /** + * revokes a permission from a role. + * + * @param role the role + * @param key the key + * @param rangeEnd the range end + * @return the response. + */ + CompletableFuture roleRevokePermission( + ByteSequence role, ByteSequence key, ByteSequence rangeEnd); + + /** + * RoleDelete deletes a role. + * + * @param role the role to delete. + * @return the response. + */ + CompletableFuture roleDelete(ByteSequence role); +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/Client.java b/jxline-core/src/main/java/cloud/xline/jxline/Client.java index e07a3ae..fe5eb61 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/Client.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/Client.java @@ -1,6 +1,12 @@ package cloud.xline.jxline; -public interface Client extends io.etcd.jetcd.Client { +public interface Client { + + ProtocolClient getProtocolClient(); + + Auth getAuthClient(); + + KV getKVClient(); /** * Override the jetcd.cloud.xline.client.Client.builder diff --git a/jxline-core/src/main/java/cloud/xline/jxline/KV.java b/jxline-core/src/main/java/cloud/xline/jxline/KV.java new file mode 100644 index 0000000..da6ecda --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/KV.java @@ -0,0 +1,112 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline; + +import cloud.xline.jxline.kv.*; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.CompactOption; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import io.etcd.jetcd.support.CloseableClient; + +import java.util.concurrent.CompletableFuture; + +/** Interface of kv client talking to xline. */ +public interface KV extends CloseableClient { + + /** + * put a key-value pair into etcd. + * + * @param key key in ByteSequence + * @param value value in ByteSequence + * @return PutResponse + */ + CompletableFuture put(ByteSequence key, ByteSequence value); + + /** + * put a key-value pair into etcd with option. + * + * @param key key in ByteSequence + * @param value value in ByteSequence + * @param option PutOption + * @return PutResponse + */ + CompletableFuture put(ByteSequence key, ByteSequence value, PutOption option); + + /** + * retrieve value for the given key. + * + * @param key key in ByteSequence + * @return GetResponse + */ + CompletableFuture get(ByteSequence key); + + /** + * retrieve keys with GetOption. + * + * @param key key in ByteSequence + * @param option GetOption + * @return GetResponse + */ + CompletableFuture get(ByteSequence key, GetOption option); + + /** + * delete a key. + * + * @param key key in ByteSequence + * @return DeleteResponse + */ + CompletableFuture delete(ByteSequence key); + + /** + * delete a key or range with option. + * + * @param key key in ByteSequence + * @param option DeleteOption + * @return DeleteResponse + */ + CompletableFuture delete(ByteSequence key, DeleteOption option); + + /** + * compact etcd KV history before the given rev. + * + *

All superseded keys with a revision less than the compaction revision will be removed. + * + * @param rev the revision to compact. + * @return CompactResponse + */ + CompletableFuture compact(long rev); + + /** + * compact etcd KV history before the given rev with option. + * + *

All superseded keys with a revision less than the compaction revision will be removed. + * + * @param rev etcd revision + * @param option CompactOption + * @return CompactResponse + */ + CompletableFuture compact(long rev, CompactOption option); + + /** + * creates a transaction. + * + * @return a Txn + */ + Txn txn(); +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/KeyValue.java b/jxline-core/src/main/java/cloud/xline/jxline/KeyValue.java new file mode 100644 index 0000000..52bb52a --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/KeyValue.java @@ -0,0 +1,93 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.support.Util; + +/** Etcd key value pair. */ +public class KeyValue { + + private final com.xline.protobuf.KeyValue kv; + private final ByteSequence unprefixedKey; + private final ByteSequence value; + + public KeyValue(com.xline.protobuf.KeyValue kv, ByteSequence namespace) { + this.kv = kv; + this.value = ByteSequence.from(kv.getValue()); + + this.unprefixedKey = + ByteSequence.from( + kv.getKey().isEmpty() + ? kv.getKey() + : Util.unprefixNamespace(kv.getKey(), namespace)); + } + + /** + * Returns the key + * + * @return the key. + */ + public ByteSequence getKey() { + return unprefixedKey; + } + + /** + * Returns the value + * + * @return the value. + */ + public ByteSequence getValue() { + return value; + } + + /** + * Returns the create revision. + * + * @return the create revision. + */ + public long getCreateRevision() { + return kv.getCreateRevision(); + } + + /** + * Returns the mod revision. + * + * @return the mod revision. + */ + public long getModRevision() { + return kv.getModRevision(); + } + + /** + * Returns the version. + * + * @return the version. + */ + public long getVersion() { + return kv.getVersion(); + } + + /** + * Returns the lease. + * + * @return the lease. + */ + public long getLease() { + return kv.getLease(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/ProtocolClient.java b/jxline-core/src/main/java/cloud/xline/jxline/ProtocolClient.java new file mode 100644 index 0000000..d5e7cf1 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/ProtocolClient.java @@ -0,0 +1,14 @@ +package cloud.xline.jxline; + +import com.xline.protobuf.Command; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.support.CloseableClient; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; + +public interface ProtocolClient extends CloseableClient { + CompletableFuture propose( + Command cmd, boolean useFastPath, BiFunction convert); +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/Txn.java b/jxline-core/src/main/java/cloud/xline/jxline/Txn.java new file mode 100644 index 0000000..635fb5e --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/Txn.java @@ -0,0 +1,102 @@ +/* + * Copyright 2016-2023 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline; + +import cloud.xline.jxline.kv.TxnResponse; +import cloud.xline.jxline.op.Cmp; +import cloud.xline.jxline.op.Op; + +import java.util.concurrent.CompletableFuture; + +/** + * Txn is the interface that wraps mini-transactions. + * + *

Usage examples

+ * + *
{@code
+ * txn.If(
+ *     new Cmp(KEY, Cmp.Op.GREATER, CmpTarget.value(VALUE)),
+ *     new Cmp(KEY, cmp.Op.EQUAL, CmpTarget.version(2))).Then(
+ *         Op.put(KEY2, VALUE2, PutOption.DEFAULT),
+ *         Op.put(KEY3, VALUE3, PutOption.DEFAULT))
+ *     .Else(
+ *         Op.put(KEY4, VALUE4, PutOption.DEFAULT),
+ *         Op.put(KEY4, VALUE4, PutOption.DEFAULT))
+ *     .commit();
+ * }
+ * + *

Txn also supports If, Then, and Else chaining. e.g. + * + *

{@code
+ * txn.If(
+ *     new Cmp(KEY, Cmp.Op.GREATER, CmpTarget.value(VALUE))).If(
+ *         new Cmp(KEY, cmp.Op.EQUAL, CmpTarget.version(VERSION)))
+ *     .Then(
+ *         Op.put(KEY2, VALUE2, PutOption.DEFAULT))
+ *     .Then(
+ *         Op.put(KEY3, VALUE3, PutOption.DEFAULT))
+ *     .Else(
+ *         Op.put(KEY4, VALUE4, PutOption.DEFAULT))
+ *     .Else(
+ *         Op.put(KEY4, VALUE4, PutOption.DEFAULT))
+ *     .commit();
+ * }
+ */ +public interface Txn { + // CHECKSTYLE:OFF + /** + * takes a list of comparison. If all comparisons passed in succeed, the operations passed into + * Then() will be executed. Or the operations passed into Else() will be executed. + * + * @param cmps the comparisons + * @return this object + */ + Txn If(Cmp... cmps); + + // CHECKSTYLE:ON + + // CHECKSTYLE:OFF + /** + * takes a list of operations. The Ops list will be executed, if the comparisons passed in If() + * succeed. + * + * @param ops the operations + * @return this object + */ + Txn Then(Op... ops); + + // CHECKSTYLE:ON + + // CHECKSTYLE:OFF + /** + * takes a list of operations. The Ops list will be executed, if the comparisons passed in If() + * fail. + * + * @param ops the operations + * @return this object + */ + Txn Else(Op... ops); + + // CHECKSTYLE:ON + + /** + * tries to commit the transaction. + * + * @return a TxnResponse wrapped in CompletableFuture + */ + CompletableFuture commit(); +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthDisableResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthDisableResponse.java new file mode 100644 index 0000000..e7e9a7b --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthDisableResponse.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; + +/** AuthDisableResponse returned by {@link Auth#authDisable()} contains a header. */ +public class AuthDisableResponse extends AbstractResponse { + + public AuthDisableResponse(com.xline.protobuf.AuthDisableResponse response) { + super(response, response.getHeader()); + } + + public AuthDisableResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthDisableResponse, + com.xline.protobuf.AuthDisableResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthEnableResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthEnableResponse.java new file mode 100644 index 0000000..b11f3ea --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthEnableResponse.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; + +/** AuthEnableResponse returned by {@link Auth#authEnable()} call contains a header. */ +public class AuthEnableResponse extends AbstractResponse { + + public AuthEnableResponse(com.xline.protobuf.AuthEnableResponse response) { + super(response, response.getHeader()); + } + + public AuthEnableResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthEnableResponse, + com.xline.protobuf.AuthEnableResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleAddResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleAddResponse.java new file mode 100644 index 0000000..d43342e --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleAddResponse.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +/** AuthRoleAddResponse returned by {@link Auth#roleAdd(ByteSequence)} contains a header. */ +public class AuthRoleAddResponse extends AbstractResponse { + + public AuthRoleAddResponse(com.xline.protobuf.AuthRoleAddResponse response) { + super(response, response.getHeader()); + } + + public AuthRoleAddResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthRoleAddResponse, + com.xline.protobuf.AuthRoleAddResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleDeleteResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleDeleteResponse.java new file mode 100644 index 0000000..d117a03 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleDeleteResponse.java @@ -0,0 +1,40 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +/** AuthRoleDeleteResponse returned by {@link Auth#roleDelete(ByteSequence)} contains a header. */ +public class AuthRoleDeleteResponse + extends AbstractResponse { + + public AuthRoleDeleteResponse(com.xline.protobuf.AuthRoleDeleteResponse response) { + super(response, response.getHeader()); + } + + public AuthRoleDeleteResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthRoleDeleteResponse, + com.xline.protobuf.AuthRoleDeleteResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleGetResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleGetResponse.java new file mode 100644 index 0000000..dbf805a --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleGetResponse.java @@ -0,0 +1,80 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * AuthRoleGetResponse returned by {@link Auth#roleGet(ByteSequence)} contains a header and a list + * of permissions. + */ +public class AuthRoleGetResponse extends AbstractResponse { + + private List permissions; + + public AuthRoleGetResponse(com.xline.protobuf.AuthRoleGetResponse response) { + super(response, response.getHeader()); + } + + public AuthRoleGetResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthRoleGetResponse, + com.xline.protobuf.AuthRoleGetResponse::getHeader); + } + + private static Permission toPermission(com.xline.protobuf.Permission perm) { + ByteSequence key = ByteSequence.from(perm.getKey()); + ByteSequence rangeEnd = ByteSequence.from(perm.getRangeEnd()); + + Permission.Type type; + switch (perm.getPermType()) { + case READ: + type = Permission.Type.READ; + break; + case WRITE: + type = Permission.Type.WRITE; + break; + case READWRITE: + type = Permission.Type.READWRITE; + break; + default: + type = Permission.Type.UNRECOGNIZED; + } + + return new Permission(type, key, rangeEnd); + } + + public synchronized List getPermissions() { + if (permissions == null) { + permissions = + getResponse().getPermList().stream() + .map(AuthRoleGetResponse::toPermission) + .collect(Collectors.toList()); + } + + return permissions; + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleGrantPermissionResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleGrantPermissionResponse.java new file mode 100644 index 0000000..d3c694e --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleGrantPermissionResponse.java @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +/** + * AuthRoleGrantPermissionResponse returned by {@link Auth#roleGrantPermission(ByteSequence, + * ByteSequence, ByteSequence, Permission.Type)} contains a header. + */ +public class AuthRoleGrantPermissionResponse + extends AbstractResponse { + + public AuthRoleGrantPermissionResponse( + com.xline.protobuf.AuthRoleGrantPermissionResponse response) { + super(response, response.getHeader()); + } + + public AuthRoleGrantPermissionResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthRoleGrantPermissionResponse, + com.xline.protobuf.AuthRoleGrantPermissionResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleListResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleListResponse.java new file mode 100644 index 0000000..9a5eae7 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleListResponse.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; + +import java.util.List; + +/** + * AuthRoleListResponse returned by {@link Auth#roleList()} contains a header and a list of roles. + */ +public class AuthRoleListResponse + extends AbstractResponse { + + public AuthRoleListResponse(com.xline.protobuf.AuthRoleListResponse response) { + super(response, response.getHeader()); + } + + public AuthRoleListResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthRoleListResponse, + com.xline.protobuf.AuthRoleListResponse::getHeader); + } + + /** + * Returns a list of roles. + * + * @return the roles. + */ + public List getRoles() { + return getResponse().getRolesList(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleRevokePermissionResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleRevokePermissionResponse.java new file mode 100644 index 0000000..7572e16 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthRoleRevokePermissionResponse.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; + +/** AuthRoleRevokePermissionResponse */ +public class AuthRoleRevokePermissionResponse + extends AbstractResponse { + + public AuthRoleRevokePermissionResponse( + com.xline.protobuf.AuthRoleRevokePermissionResponse response) { + super(response, response.getHeader()); + } + + public AuthRoleRevokePermissionResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthRoleRevokePermissionResponse, + com.xline.protobuf.AuthRoleRevokePermissionResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserAddResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserAddResponse.java new file mode 100644 index 0000000..63b43a7 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserAddResponse.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +/** + * AuthUserAddResponse returned by {@link Auth#userAdd(ByteSequence, ByteSequence)} contains a + * header. + */ +public class AuthUserAddResponse extends AbstractResponse { + + public AuthUserAddResponse(com.xline.protobuf.AuthUserAddResponse response) { + super(response, response.getHeader()); + } + + public AuthUserAddResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthUserAddResponse, + com.xline.protobuf.AuthUserAddResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserChangePasswordResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserChangePasswordResponse.java new file mode 100644 index 0000000..c01c669 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserChangePasswordResponse.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; + +/** AuthUserChangePasswordResponse */ +public class AuthUserChangePasswordResponse + extends AbstractResponse { + + public AuthUserChangePasswordResponse( + com.xline.protobuf.AuthUserChangePasswordResponse response) { + super(response, response.getHeader()); + } + + public AuthUserChangePasswordResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthUserChangePasswordResponse, + com.xline.protobuf.AuthUserChangePasswordResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserDeleteResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserDeleteResponse.java new file mode 100644 index 0000000..745ab18 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserDeleteResponse.java @@ -0,0 +1,40 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +/** AuthUserDeleteResponse returned by {@link Auth#userDelete(ByteSequence)} contains a header. */ +public class AuthUserDeleteResponse + extends AbstractResponse { + + public AuthUserDeleteResponse(com.xline.protobuf.AuthUserDeleteResponse response) { + super(response, response.getHeader()); + } + + public AuthUserDeleteResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthUserDeleteResponse, + com.xline.protobuf.AuthUserDeleteResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserGetResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserGetResponse.java new file mode 100644 index 0000000..b6ec915 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserGetResponse.java @@ -0,0 +1,53 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +import java.util.List; + +/** + * AuthUserGetResponse returned by {@link Auth#userGet(ByteSequence)} contains a header and a list + * of roles associated with the user. + */ +public class AuthUserGetResponse extends AbstractResponse { + + public AuthUserGetResponse(com.xline.protobuf.AuthUserGetResponse response) { + super(response, response.getHeader()); + } + + public AuthUserGetResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthUserGetResponse, + com.xline.protobuf.AuthUserGetResponse::getHeader); + } + + /** + * Returns a list of roles. + * + * @return the roles. + */ + public List getRoles() { + return getResponse().getRolesList(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserGrantRoleResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserGrantRoleResponse.java new file mode 100644 index 0000000..f285167 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserGrantRoleResponse.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; + +/** AuthUserGrantRoleResponse */ +public class AuthUserGrantRoleResponse + extends AbstractResponse { + + public AuthUserGrantRoleResponse(com.xline.protobuf.AuthUserGrantRoleResponse response) { + super(response, response.getHeader()); + } + + public AuthUserGrantRoleResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthUserGrantRoleResponse, + com.xline.protobuf.AuthUserGrantRoleResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserListResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserListResponse.java new file mode 100644 index 0000000..01fa700 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserListResponse.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; + +import java.util.List; + +/** + * AuthUserListResponse returned by {@link Auth#userList()} contains a header and a list of users. + */ +public class AuthUserListResponse + extends AbstractResponse { + + public AuthUserListResponse(com.xline.protobuf.AuthUserListResponse response) { + super(response, response.getHeader()); + } + + public AuthUserListResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthUserListResponse, + com.xline.protobuf.AuthUserListResponse::getHeader); + } + + /** + * Returns a list of users. + * + * @return the users. + */ + public List getUsers() { + return getResponse().getUsersList(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserRevokeRoleResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserRevokeRoleResponse.java new file mode 100644 index 0000000..e397611 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/AuthUserRevokeRoleResponse.java @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +/** + * AuthUserRevokeRoleResponse returned by {@link Auth#userRevokeRole(ByteSequence, ByteSequence)} + * contains a header. + */ +public class AuthUserRevokeRoleResponse + extends AbstractResponse { + + public AuthUserRevokeRoleResponse(com.xline.protobuf.AuthUserRevokeRoleResponse response) { + super(response, response.getHeader()); + } + + public AuthUserRevokeRoleResponse(CommandResponse sr, SyncResponse asr) { + super( + sr, + asr, + CommandResponse::getAuthUserRevokeRoleResponse, + com.xline.protobuf.AuthUserRevokeRoleResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/auth/Permission.java b/jxline-core/src/main/java/cloud/xline/jxline/auth/Permission.java new file mode 100644 index 0000000..7d6d5d5 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/auth/Permission.java @@ -0,0 +1,56 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.auth; + +import io.etcd.jetcd.ByteSequence; + +/** + * represents a permission over a range of keys. + */ +public class Permission { + + private final Type permType; + private final ByteSequence key; + private final ByteSequence rangeEnd; + + public enum Type { + READ, WRITE, READWRITE, UNRECOGNIZED, + } + + public Permission(Type permType, ByteSequence key, ByteSequence rangeEnd) { + this.permType = permType; + this.key = key; + this.rangeEnd = rangeEnd; + } + + /** + * Returns the type of Permission: READ, WRITE, READWRITE, or UNRECOGNIZED. + * + * @return the permission type. + */ + public Type getPermType() { + return permType; + } + + public ByteSequence getKey() { + return key; + } + + public ByteSequence getRangeEnd() { + return rangeEnd; + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java b/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java new file mode 100644 index 0000000..6e8738e --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java @@ -0,0 +1,110 @@ +package cloud.xline.jxline.exceptions; + +import com.curp.protobuf.CurpError; +import com.google.protobuf.Empty; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; + +import javax.annotation.Nullable; + +public class CurpException extends RuntimeException { + private static final CurpError DEFAULT_CURP_ERROR = + CurpError.newBuilder().setRpcTransport(Empty.newBuilder().build()).build(); + + private static final Metadata.Key STATUS_DETAILS_KEY = + Metadata.Key.of( + "grpc-status-details-bin", ProtoUtils.metadataMarshaller(DEFAULT_CURP_ERROR)); + + private final CurpError error; + + public CurpException(CurpError error) { + this.error = error; + } + + public enum Priority { + LOW(1), + HIGH(2); + + private final int value; + + Priority(int value) { + this.value = value; + } + + public int value() { + return value; + } + } + + public Priority priority() { + if (this.error.hasDuplicated() + || this.error.hasShuttingDown() + || this.error.hasInvalidConfig() + || this.error.hasNodeAlreadyExists() + || this.error.hasNodeNotExists() + || this.error.hasLearnerNotCatchUp() + || this.error.hasExpiredClientId() + || this.error.hasRedirect()) { + return Priority.HIGH; + } + if (this.error.hasRpcTransport() + || this.error.hasInternal() + || this.error.hasKeyConflict() + || this.error.hasLeaderTransfer()) { + return Priority.LOW; + } + throw new RuntimeException("unknown curp error"); + } + + public boolean shouldAbortFastRound() { + if (this.error.hasDuplicated() + || this.error.hasShuttingDown() + || this.error.hasInvalidConfig() + || this.error.hasNodeAlreadyExists() + || this.error.hasNodeNotExists() + || this.error.hasLearnerNotCatchUp() + || this.error.hasExpiredClientId() + || this.error.hasRedirect()) { + return true; + } + return false; + } + + public boolean shouldAbortSlowRound() { + if (this.error.hasShuttingDown() + || this.error.hasInvalidConfig() + || this.error.hasNodeAlreadyExists() + || this.error.hasNodeNotExists() + || this.error.hasLearnerNotCatchUp() + || this.error.hasExpiredClientId() + || this.error.hasRedirect() + || this.error.hasWrongClusterVersion()) { + return true; + } + return false; + } + + public static CurpException toCurpException(Throwable throwable) { + if (throwable instanceof CurpException) { + return (CurpException) throwable; + } + return toCurpException( + Status.fromThrowable(throwable), Status.trailersFromThrowable(throwable)); + } + + public static CurpException toCurpException(Status status, @Nullable Metadata trailers) { + if (status.getCode() == Status.Code.UNAVAILABLE + || status.getCode() == Status.Code.UNKNOWN + || trailers == null) { + return new CurpException(DEFAULT_CURP_ERROR); + } + CurpError curpError = trailers.get(STATUS_DETAILS_KEY); + return new CurpException(curpError); + } + + @Override + public String toString() { + return "CurpError(" + error.toString().replace("\n", "") + ")"; + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/exceptions/XlineException.java b/jxline-core/src/main/java/cloud/xline/jxline/exceptions/XlineException.java new file mode 100644 index 0000000..30fd67c --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/exceptions/XlineException.java @@ -0,0 +1,38 @@ +package cloud.xline.jxline.exceptions; + +import com.xline.protobuf.ExecuteError; + +import javax.annotation.Nullable; + +public class XlineException extends RuntimeException { + + /** + * {@link ExecuteError} when send invalid request to xline servers, maybe null when there is + * some unknown exception happens in the runtime + */ + private ExecuteError error; + + private XlineException(Throwable cause) { + super(cause); + } + + public XlineException(ExecuteError error) { + this.error = error; + } + + public boolean hasError() { + return error != null; + } + + @Nullable + public ExecuteError getError() { + return error; + } + + public static XlineException toXlineException(Throwable throwable) { + if (throwable instanceof XlineException) { + return (XlineException) throwable; + } + return new XlineException(throwable); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/AbstractResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/AbstractResponse.java new file mode 100644 index 0000000..c9233f5 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/AbstractResponse.java @@ -0,0 +1,95 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.impl; + +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.ResponseHeader; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.Response; + +import java.util.function.Function; + +public class AbstractResponse implements Response { + + private final R response; + private final ResponseHeader responseHeader; + private final Header header; + + public AbstractResponse( + CommandResponse sr, + SyncResponse asr, + Function mapping, + Function headerMapping) { + this.response = mapping.apply(sr); + if (asr != null) { + this.responseHeader = + headerMapping.apply(this.response).toBuilder() + .setRevision(asr.getRevision()) + .build(); + } else { + this.responseHeader = headerMapping.apply(this.response); + } + this.header = new HeaderImpl(); + } + + public AbstractResponse(R response, ResponseHeader header) { + this.response = response; + this.responseHeader = header; + this.header = new HeaderImpl(); + } + + @Override + public Header getHeader() { + return header; + } + + @Override + public String toString() { + return response.toString(); + } + + protected final R getResponse() { + return this.response; + } + + protected final ResponseHeader getResponseHeader() { + return this.responseHeader; + } + + private class HeaderImpl implements Header { + + @Override + public long getClusterId() { + return responseHeader.getClusterId(); + } + + @Override + public long getMemberId() { + return responseHeader.getMemberId(); + } + + @Override + public long getRevision() { + return responseHeader.getRevision(); + } + + @Override + public long getRaftTerm() { + return responseHeader.getRaftTerm(); + } + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/AuthCredential.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/AuthCredential.java new file mode 100644 index 0000000..ac79b9d --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/AuthCredential.java @@ -0,0 +1,115 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.impl; + +import com.google.protobuf.ByteString; +import com.xline.protobuf.AuthenticateRequest; +import com.xline.protobuf.VertxAuthGrpc; +import io.grpc.CallCredentials; +import io.grpc.ClientInterceptor; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.stub.MetadataUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; + +import static io.etcd.jetcd.Preconditions.checkArgument; + +class AuthCredential extends CallCredentials { + public static final Metadata.Key TOKEN = + Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER); + + private final ClientConnectionManager manager; + private volatile Metadata meta; + + public AuthCredential(ClientConnectionManager manager) { + this.manager = manager; + } + + @Override + public void applyRequestMetadata( + RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier) { + final Metadata meta = this.meta; + + if (meta != null) { + applier.apply(meta); + } else { + authenticate(applier); + } + } + + public void refresh() { + meta = null; + } + + @SuppressWarnings("rawtypes") + private void authenticate(MetadataApplier applier) { + checkArgument(!manager.builder().user().isEmpty(), "username can not be empty."); + checkArgument(!manager.builder().password().isEmpty(), "password can not be empty."); + + VertxAuthGrpc.AuthVertxStub authFutureStub = + VertxAuthGrpc.newVertxStub(this.manager.getInitChannel()); + + List interceptorsChain = new ArrayList<>(); + if (manager.builder().authHeaders() != null) { + Metadata metadata = new Metadata(); + manager.builder() + .authHeaders() + .forEach((BiConsumer) metadata::put); + + interceptorsChain.add(MetadataUtils.newAttachHeadersInterceptor(metadata)); + } + if (manager.builder().authInterceptors() != null) { + interceptorsChain.addAll(manager.builder().authInterceptors()); + } + + if (!interceptorsChain.isEmpty()) { + authFutureStub = + authFutureStub.withInterceptors( + interceptorsChain.toArray(new ClientInterceptor[0])); + } + + final ByteString user = ByteString.copyFrom(this.manager.builder().user().getBytes()); + final ByteString pass = ByteString.copyFrom(this.manager.builder().password().getBytes()); + + AuthenticateRequest request = + AuthenticateRequest.newBuilder().setNameBytes(user).setPasswordBytes(pass).build(); + + try { + authFutureStub + .authenticate(request) + .onFailure( + t -> { + applier.fail(Status.UNAUTHENTICATED.withCause(t)); + }) + .onSuccess( + h -> { + Metadata meta = new Metadata(); + meta.put(TOKEN, h.getToken()); + + this.meta = meta; + + applier.apply(this.meta); + }); + } catch (Exception e) { + applier.fail(Status.UNAUTHENTICATED.withCause(e)); + } + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/AuthImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/AuthImpl.java new file mode 100644 index 0000000..98eda4b --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/AuthImpl.java @@ -0,0 +1,282 @@ +package cloud.xline.jxline.impl; + +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.ProtocolClient; +import cloud.xline.jxline.auth.AuthDisableResponse; +import cloud.xline.jxline.auth.AuthEnableResponse; +import cloud.xline.jxline.auth.AuthRoleAddResponse; +import cloud.xline.jxline.auth.AuthRoleDeleteResponse; +import cloud.xline.jxline.auth.AuthRoleGetResponse; +import cloud.xline.jxline.auth.AuthRoleGrantPermissionResponse; +import cloud.xline.jxline.auth.AuthRoleListResponse; +import cloud.xline.jxline.auth.AuthRoleRevokePermissionResponse; +import cloud.xline.jxline.auth.AuthUserAddResponse; +import cloud.xline.jxline.auth.AuthUserChangePasswordResponse; +import cloud.xline.jxline.auth.AuthUserDeleteResponse; +import cloud.xline.jxline.auth.AuthUserGetResponse; +import cloud.xline.jxline.auth.AuthUserGrantRoleResponse; +import cloud.xline.jxline.auth.AuthUserListResponse; +import cloud.xline.jxline.auth.AuthUserRevokeRoleResponse; +import cloud.xline.jxline.auth.Permission; +import com.google.protobuf.ByteString; +import com.xline.protobuf.*; +import io.etcd.jetcd.ByteSequence; + +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; + +public class AuthImpl extends Impl implements Auth { + + private final ProtocolClient protocolClient; + + private final VertxAuthGrpc.AuthVertxStub authStub; + + AuthImpl(ProtocolClient protocolClient, ClientConnectionManager connectionManager) { + super(connectionManager); + this.protocolClient = protocolClient; + this.authStub = connectionManager.newStub(VertxAuthGrpc::newVertxStub); + } + + @Override + public CompletableFuture authEnable() { + AuthEnableRequest req = AuthEnableRequest.getDefaultInstance(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthEnableRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthEnableResponse::new); + } + + @Override + public CompletableFuture authDisable() { + AuthDisableRequest req = AuthDisableRequest.getDefaultInstance(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthDisableRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthDisableResponse::new); + } + + @Override + public CompletableFuture userAdd( + ByteSequence user, ByteSequence password) { + requireNonNull(user, "user can't be null"); + requireNonNull(password, "password can't be null"); + + AuthUserAddRequest req = + AuthUserAddRequest.newBuilder() + .setNameBytes(ByteString.copyFrom(user.getBytes())) + .setPasswordBytes(ByteString.copyFrom(password.getBytes())) + .build(); + return completable(authStub.userAdd(req), AuthUserAddResponse::new); + } + + @Override + public CompletableFuture userDelete(ByteSequence user) { + requireNonNull(user, "user can't be null"); + + AuthUserDeleteRequest req = AuthUserDeleteRequest.newBuilder().build(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthUserDeleteRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthUserDeleteResponse::new); + } + + @Override + public CompletableFuture userChangePassword( + ByteSequence user, ByteSequence password) { + requireNonNull(user, "user can't be null"); + requireNonNull(password, "password can't be null"); + + AuthUserChangePasswordRequest req = + AuthUserChangePasswordRequest.newBuilder() + .setNameBytes(ByteString.copyFrom(user.getBytes())) + .setPasswordBytes(ByteString.copyFrom(password.getBytes())) + .build(); + return completable(authStub.userChangePassword(req), AuthUserChangePasswordResponse::new); + } + + @Override + public CompletableFuture userGet(ByteSequence user) { + requireNonNull(user, "user can't be null"); + + AuthUserGetRequest req = + AuthUserGetRequest.newBuilder() + .setNameBytes(ByteString.copyFrom(user.getBytes())) + .build(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthUserGetRequest(req)) + .build(); + return protocolClient.propose(command, true, AuthUserGetResponse::new); + } + + @Override + public CompletableFuture userList() { + AuthUserListRequest req = AuthUserListRequest.getDefaultInstance(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthUserListRequest(req)) + .build(); + return protocolClient.propose(command, true, AuthUserListResponse::new); + } + + @Override + public CompletableFuture userGrantRole( + ByteSequence user, ByteSequence role) { + requireNonNull(user, "user can't be null"); + requireNonNull(role, "role can't be null"); + + AuthUserGrantRoleRequest req = + AuthUserGrantRoleRequest.newBuilder() + .setUserBytes(ByteString.copyFrom(user.getBytes())) + .setRoleBytes(ByteString.copyFrom(role.getBytes())) + .build(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthUserGrantRoleRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthUserGrantRoleResponse::new); + } + + @Override + public CompletableFuture userRevokeRole( + ByteSequence user, ByteSequence role) { + requireNonNull(user, "user can't be null"); + requireNonNull(role, "role can't be null"); + + AuthUserRevokeRoleRequest req = + AuthUserRevokeRoleRequest.newBuilder() + .setNameBytes(ByteString.copyFrom(user.getBytes())) + .setRoleBytes(ByteString.copyFrom(role.getBytes())) + .build(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthUserRevokeRoleRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthUserRevokeRoleResponse::new); + } + + @Override + public CompletableFuture roleAdd(ByteSequence role) { + requireNonNull(role, "role can't be null"); + + AuthRoleAddRequest req = + AuthRoleAddRequest.newBuilder() + .setNameBytes(ByteString.copyFrom(role.getBytes())) + .build(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthRoleAddRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthRoleAddResponse::new); + } + + @Override + public CompletableFuture roleGrantPermission( + ByteSequence role, ByteSequence key, ByteSequence rangeEnd, Permission.Type permType) { + requireNonNull(role, "role can't be null"); + requireNonNull(key, "key can't be null"); + requireNonNull(rangeEnd, "rangeEnd can't be null"); + requireNonNull(permType, "permType can't be null"); + + com.xline.protobuf.Permission.Type type; + switch (permType) { + case WRITE: + type = com.xline.protobuf.Permission.Type.WRITE; + break; + case READWRITE: + type = com.xline.protobuf.Permission.Type.READWRITE; + break; + case READ: + type = com.xline.protobuf.Permission.Type.READ; + break; + default: + type = com.xline.protobuf.Permission.Type.UNRECOGNIZED; + break; + } + + com.xline.protobuf.Permission perm = + com.xline.protobuf.Permission.newBuilder() + .setKey(ByteString.copyFrom(key.getBytes())) + .setRangeEnd(ByteString.copyFrom(rangeEnd.getBytes())) + .setPermType(type) + .build(); + AuthRoleGrantPermissionRequest req = + AuthRoleGrantPermissionRequest.newBuilder() + .setNameBytes(ByteString.copyFrom(role.getBytes())) + .setPerm(perm) + .build(); + Command command = + Command.newBuilder() + .setRequest( + RequestWithToken.newBuilder() + .setAuthRoleGrantPermissionRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthRoleGrantPermissionResponse::new); + } + + @Override + public CompletableFuture roleGet(ByteSequence role) { + requireNonNull(role, "role can't be null"); + + AuthRoleGetRequest req = + AuthRoleGetRequest.newBuilder() + .setRoleBytes(ByteString.copyFrom(role.getBytes())) + .build(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthRoleGetRequest(req)) + .build(); + return protocolClient.propose(command, true, AuthRoleGetResponse::new); + } + + @Override + public CompletableFuture roleList() { + AuthRoleListRequest req = AuthRoleListRequest.getDefaultInstance(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthRoleListRequest(req)) + .build(); + return protocolClient.propose(command, true, AuthRoleListResponse::new); + } + + @Override + public CompletableFuture roleRevokePermission( + ByteSequence role, ByteSequence key, ByteSequence rangeEnd) { + requireNonNull(role, "role can't be null"); + requireNonNull(key, "key can't be null"); + requireNonNull(rangeEnd, "rangeEnd can't be null"); + + AuthRoleRevokePermissionRequest req = + AuthRoleRevokePermissionRequest.newBuilder() + .setRoleBytes(ByteString.copyFrom(role.getBytes())) + .setKey(ByteString.copyFrom(key.getBytes())) + .setRangeEnd(ByteString.copyFrom(rangeEnd.getBytes())) + .build(); + Command command = + Command.newBuilder() + .setRequest( + RequestWithToken.newBuilder() + .setAuthRoleRevokePermissionRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthRoleRevokePermissionResponse::new); + } + + @Override + public CompletableFuture roleDelete(ByteSequence role) { + requireNonNull(role, "role can't be null"); + + AuthRoleDeleteRequest req = + AuthRoleDeleteRequest.newBuilder() + .setRoleBytes(ByteString.copyFrom(role.getBytes())) + .build(); + Command command = + Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setAuthRoleDeleteRequest(req)) + .build(); + return protocolClient.propose(command, false, AuthRoleDeleteResponse::new); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/ClientConnectionManager.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/ClientConnectionManager.java index 696b8af..a7e2eef 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/ClientConnectionManager.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/ClientConnectionManager.java @@ -5,6 +5,7 @@ import io.etcd.jetcd.ByteSequence; import io.grpc.*; import io.grpc.netty.NegotiationType; +import io.grpc.stub.AbstractStub; import io.netty.channel.ChannelOption; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; @@ -12,16 +13,22 @@ import java.util.concurrent.*; import java.util.function.BiConsumer; +import java.util.function.Function; final class ClientConnectionManager { private final Object lock; private final ClientBuilder builder; private final ExecutorService executorService; + private final AuthCredential credential; private volatile Vertx vertx; + /// Integrated channel + private volatile ManagedChannel initChannel; + ClientConnectionManager(ClientBuilder builder) { this.lock = new Object(); this.builder = builder; + this.credential = new AuthCredential(this); if (builder.executorService() == null) { ThreadFactory backingThreadFactory = Executors.defaultThreadFactory(); @@ -144,6 +151,47 @@ public void start( return channelBuilder; } + public ManagedChannel getInitChannel() { + if (this.initChannel == null) { + synchronized (lock) { + if (this.initChannel == null) { + this.initChannel = defaultChannelBuilder().build(); + } + } + } + return this.initChannel; + } + + /** + * Create stub with saved integrated channel. Used to create some xline servers + * + * @param supplier the stub supplier + * @param the type of stub + * @return the attached stub + */ + > T newStub(Function supplier) { + return newStub(supplier, this.getInitChannel()); + } + + /** + * Create stub with a provided channel. + * + * @param stubCustomizer supplier the stub supplier + * @param channel the channel + * @return the attached stub + */ + > T newStub( + Function stubCustomizer, ManagedChannel channel) { + T stub = stubCustomizer.apply(channel); + if (builder.waitForReady()) { + stub = stub.withWaitForReady(); + } + if (builder.user() != null && builder.password() != null) { + stub = stub.withCallCredentials(this.credential); + } + return stub; + } + Vertx vertx() { if (this.vertx == null) { synchronized (this.lock) { diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/ClientImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/ClientImpl.java index ff00be3..2dc28d5 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/ClientImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/ClientImpl.java @@ -1,54 +1,41 @@ package cloud.xline.jxline.impl; -import cloud.xline.jxline.Client; -import cloud.xline.jxline.ClientBuilder; +import cloud.xline.jxline.*; -import io.etcd.jetcd.*; +import io.etcd.jetcd.support.MemorizingClientSupplier; public final class ClientImpl implements Client { - public ClientImpl(ClientBuilder clientBuilder) {} + private final ClientConnectionManager manager; - @Override - public Auth getAuthClient() { - return null; - } - - @Override - public KV getKVClient() { - return null; - } + private final ProtocolClient protocolClient; - @Override - public Cluster getClusterClient() { - return null; - } + private final MemorizingClientSupplier kvClient; - @Override - public Maintenance getMaintenanceClient() { - return null; - } + private final MemorizingClientSupplier authClient; - @Override - public Lease getLeaseClient() { - return null; + public ClientImpl(ClientBuilder clientBuilder) { + this.manager = new ClientConnectionManager(clientBuilder); + this.protocolClient = new ProtocolClientImpl(this.manager); + this.kvClient = + new MemorizingClientSupplier<>(() -> new KVImpl(this.protocolClient, this.manager)); + this.authClient = + new MemorizingClientSupplier<>( + () -> new AuthImpl(this.protocolClient, this.manager)); } @Override - public Watch getWatchClient() { - return null; + public ProtocolClient getProtocolClient() { + return this.protocolClient; } @Override - public Lock getLockClient() { - return null; + public KV getKVClient() { + return this.kvClient.get(); } @Override - public Election getElectionClient() { - return null; + public Auth getAuthClient() { + return this.authClient.get(); } - - @Override - public void close() {} } diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java new file mode 100644 index 0000000..38e6d4f --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java @@ -0,0 +1,108 @@ +package cloud.xline.jxline.impl; + +import cloud.xline.jxline.KV; +import cloud.xline.jxline.ProtocolClient; +import cloud.xline.jxline.Txn; +import cloud.xline.jxline.kv.*; +import cloud.xline.jxline.op.TxnImpl; +import cloud.xline.jxline.support.Requests; +import com.xline.protobuf.Command; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.CompactOption; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; + +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; + +class KVImpl extends Impl implements KV { + + private final ProtocolClient protocolClient; + + KVImpl(ProtocolClient protocolClient, ClientConnectionManager manager) { + super(manager); + this.protocolClient = protocolClient; + } + + @Override + public CompletableFuture put(ByteSequence key, ByteSequence value) { + return this.put(key, value, PutOption.DEFAULT); + } + + @Override + public CompletableFuture put( + ByteSequence key, ByteSequence value, PutOption option) { + requireNonNull(key, "key should not be null"); + requireNonNull(value, "value should not be null"); + requireNonNull(option, "option should not be null"); + Command cmd = + Requests.mapPutCommand(key, value, option, this.connectionManager().getNamespace()); + return protocolClient.propose( + cmd, + true, + (sr, asr) -> new PutResponse(sr, asr, this.connectionManager().getNamespace())); + } + + @Override + public CompletableFuture get(ByteSequence key) { + requireNonNull(key, "key should not be null"); + return get(key, GetOption.DEFAULT); + } + + @Override + public CompletableFuture get(ByteSequence key, GetOption option) { + requireNonNull(key, "key should not be null"); + requireNonNull(option, "option should not be null"); + Command cmd = + Requests.mapRangeCommand(key, option, this.connectionManager().getNamespace()); + return protocolClient.propose( + cmd, + true, + (sr, asr) -> new GetResponse(sr, asr, this.connectionManager().getNamespace())); + } + + @Override + public CompletableFuture delete(ByteSequence key) { + requireNonNull(key, "key should not be null"); + return delete(key, DeleteOption.DEFAULT); + } + + @Override + public CompletableFuture delete(ByteSequence key, DeleteOption option) { + requireNonNull(key, "key should not be null"); + requireNonNull(option, "option should not be null"); + Command cmd = + Requests.mapDeleteCommand(key, option, this.connectionManager().getNamespace()); + return protocolClient.propose( + cmd, + true, + (sr, asr) -> new DeleteResponse(sr, asr, this.connectionManager().getNamespace())); + } + + @Override + public CompletableFuture compact(long revision) { + return compact(revision, CompactOption.DEFAULT); + } + + @Override + public CompletableFuture compact(long revision, CompactOption option) { + requireNonNull(option, "option should not be null"); + Command cmd = Requests.mapCompactRequest(revision, option); + return protocolClient.propose(cmd, true, CompactResponse::new); + } + + @Override + public Txn txn() { + return TxnImpl.newTxn( + this.connectionManager().getNamespace(), + cmd -> + protocolClient.propose( + cmd, + true, + (sr, asr) -> + new TxnResponse( + sr, asr, this.connectionManager().getNamespace()))); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java new file mode 100644 index 0000000..648fa33 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java @@ -0,0 +1,417 @@ +package cloud.xline.jxline.impl; + +import cloud.xline.jxline.ProtocolClient; +import cloud.xline.jxline.exceptions.CurpException; +import cloud.xline.jxline.exceptions.XlineException; +import cloud.xline.jxline.utils.Invoke; +import cloud.xline.jxline.utils.Pair; +import com.curp.protobuf.*; +import com.google.protobuf.Empty; +import com.google.protobuf.InvalidProtocolBufferException; +import com.xline.protobuf.Command; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.ExecuteError; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.resolver.IPNameResolver; +import io.grpc.ManagedChannel; +import io.vertx.core.Future; + +import javax.annotation.Nullable; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +class ProtocolClientImpl extends Impl implements ProtocolClient { + + private final State state; + + ProtocolClientImpl(ClientConnectionManager connectionManager) { + super(connectionManager); + this.state = getInitState(); + } + + private ProposeId getProposeId() { + UUID uuid = UUID.randomUUID(); + // TODO: obtain from server + long clientId = uuid.getMostSignificantBits(); + // TODO: implement tracker + long seqNum = uuid.getMostSignificantBits(); + return ProposeId.newBuilder().setClientId(clientId).setSeqNum(seqNum).build(); + } + + @Override + public CompletableFuture propose( + Command cmd, + boolean useFastPath, + BiFunction convert) { + ProposeId id = this.getProposeId(); + Executor executor = connectionManager().getExecutorService(); + if (!useFastPath) { + return CompletableFuture.supplyAsync(() -> this.fastRound(id, cmd), executor) + .handleAsync((r, ex) -> this.slowRound(id), executor) + .thenApply(pair -> pair.apply(convert)); + } + CompletionService service = + new ExecutorCompletionService<>(connectionManager().getExecutorService()); + service.submit(() -> convert.apply(this.fastRound(id, cmd), null)); + service.submit(() -> this.slowRound(id).apply(convert)); + + return CompletableFuture.supplyAsync( + () -> { + CurpException exception = null; + for (int i = 0; true; i++) { + try { + return service.take().get(); + } catch (InterruptedException e) { + throw XlineException.toXlineException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof XlineException) { + throw (XlineException) cause; + } + if (cause instanceof CurpException) { + CurpException ex = ((CurpException) cause); + if (ex.shouldAbortSlowRound()) { + throw (CurpException) cause; + } + if (exception == null + || exception.priority().value() <= ex.priority().value()) { + exception = ex; + } + if (i == 1) { + throw exception; + } + continue; + } + throw XlineException.toXlineException(e); + } + } + }, + executor); + } + + Pair slowRound(ProposeId id) { + logger().info(String.format("Slow round start. Propose ID %s.", id)); + WaitSyncedRequest waitSyncReq = + WaitSyncedRequest.newBuilder() + .setProposeId(id) + .setClusterVersion(this.state.getClusterVersion()) + .build(); + try { + WaitSyncedResponse resp = + completable( + mapLeader(stub -> stub.waitSynced(waitSyncReq)), + res -> res, + CurpException::toCurpException) + .get(); + if (resp.getExeResult().hasError()) { + throw new XlineException(ExecuteError.parseFrom(resp.getExeResult().getError())); + } + if (resp.getAfterSyncResult().hasError()) { + throw new XlineException( + ExecuteError.parseFrom(resp.getAfterSyncResult().getError())); + } + CommandResponse er = CommandResponse.parseFrom(resp.getExeResult().getOk()); + SyncResponse asr = SyncResponse.parseFrom(resp.getAfterSyncResult().getOk()); + return new Pair<>(er, asr); + } catch (InterruptedException | InvalidProtocolBufferException e) { + throw XlineException.toXlineException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (!(cause instanceof CurpException)) { + throw XlineException.toXlineException(cause); + } + throw (CurpException) cause; + } + } + + /** + * Run fastRound + * + * @param id The Proposal id + * @param cmd The command + * @return {@link CommandResponse} + * @throws {@link XlineException} when got serializing error, command execution error or + * unexpected behavior, {@link CurpException} when got curp error + */ + CommandResponse fastRound(ProposeId id, Command cmd) { + logger().info(String.format("Fast round start. Propose ID %s.", id)); + ProposeRequest propReq = + ProposeRequest.newBuilder() + .setCommand(cmd.toByteString()) + .setProposeId(id) + .setClusterVersion(this.state.getClusterVersion()) + .build(); + + Collection stubs = this.state.getStubs().values(); + int okCnt = 0; + int superQuorum = this.superQuorum(stubs.size()); + CommandResponse exeRes = null; + CurpException exception = null; + + CompletionService completionService = + forEachServer( + stubs, + stub -> { + CmdResult cmdResult = + completable( + stub.propose(propReq), + ProposeResponse::getResult, + CurpException::toCurpException) + .get(); + if (cmdResult.hasError()) { + ExecuteError error = ExecuteError.parseFrom(cmdResult.getError()); + throw new XlineException(error); + } + if (cmdResult.hasOk()) { + return CommandResponse.parseFrom(cmdResult.getOk()); + } + // No cmd result + return null; + }); + + for (int i = 0; i < stubs.size(); i++) { + try { + CommandResponse resp = completionService.take().get(); + if (resp != null) { + exeRes = resp; + } + okCnt++; + if (okCnt >= superQuorum && exeRes != null) { + return exeRes; + } + } catch (InterruptedException e) { + // unexpected exception + throw XlineException.toXlineException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + // extract the most inner exception + while (cause instanceof ExecutionException) { + cause = cause.getCause(); + } + // serde error + if (cause instanceof InvalidProtocolBufferException) { + continue; + } + if (!(cause instanceof CurpException)) { + throw XlineException.toXlineException(cause); + } + CurpException ex = (CurpException) cause; + if (ex.shouldAbortFastRound()) { + throw ex; + } + if (exception == null || exception.priority().value() <= ex.priority().value()) { + exception = ex; + } + } + } + if (exception != null) { + throw exception; + } + // We will at least send the request to the leader if no `WrongClusterVersion` returned. + // If no errors occur, the leader should return the ER + // If it is because the super quorum has not been reached, an error will definitely occur. + // Otherwise, there is no leader in the cluster state currently, return wrong cluster + // version + // and attempt to retrieve the cluster state again. + throw new CurpException( + CurpError.newBuilder().setWrongClusterVersion(Empty.newBuilder().build()).build()); + } + + CompletionService forEachServer( + Collection stubs, + Invoke task) { + CompletionService completionService = + new ExecutorCompletionService<>(this.connectionManager().getExecutorService()); + for (VertxProtocolGrpc.ProtocolVertxStub stub : stubs) { + completionService.submit(() -> task.call(stub)); + } + return completionService; + } + + Future mapLeader(Function> task) { + VertxProtocolGrpc.ProtocolVertxStub leaderStub = this.state.getLeaderStub(); + if (leaderStub == null) { + // choose a random leader, it will return redirect error if leader is wrong + // TODO: fetch cluster here. + leaderStub = + this.state.stubs.values() + .toArray(VertxProtocolGrpc.ProtocolVertxStub[]::new)[0]; + } + return task.apply(leaderStub); + } + + int superQuorum(int size) { + int faultTolerance = size - quorum(size); + return faultTolerance + recoverQuorum(size); + } + + int quorum(int size) { + return size / 2 + 1; + } + + int recoverQuorum(int size) { + return quorum(size) / 2 + 1; + } + + // TODO: Need to be refactored within retry policy... + State getInitState() { + VertxProtocolGrpc.ProtocolVertxStub initStub = + connectionManager().newStub(VertxProtocolGrpc::newVertxStub); + FetchClusterResponse response = null; + FetchClusterRequest request = + FetchClusterRequest.newBuilder().setLinearizable(false).build(); + int retries = -1; + do { + retries++; + if (retries > 5) { + throw new RuntimeException("connection failed"); + } + try { + response = completable(initStub.fetchCluster(request)).get(3, TimeUnit.SECONDS); + } catch (Exception e) { + logger().warn("fetch cluster failed, " + e); + } + } while (response == null || !response.hasLeaderId()); + + HashMap stubs = new HashMap<>(); + for (Member member : response.getMembersList()) { + String target = + member.getAddrsList().stream() + .map( + addr -> { + if (!addr.startsWith("http")) { + return URI.create("http://" + addr); + } + return URI.create(addr); + }) + .map(ProtocolClientImpl::getEndpoint) + .distinct() + .collect(Collectors.joining(",")); + String authority = connectionManager().builder().authority(); + String ips = + String.format( + "%s://%s/%s", + IPNameResolver.SCHEME, authority != null ? authority : "", target); + + ManagedChannel channel = connectionManager().defaultChannelBuilder(ips).build(); + VertxProtocolGrpc.ProtocolVertxStub stub = + connectionManager().newStub(VertxProtocolGrpc::newVertxStub, channel); + stubs.put(member.getId(), stub); + } + + return new State( + response.getLeaderId(), response.getTerm(), response.getClusterVersion(), stubs); + } + + private class State { + private final ReadWriteLock lock; + private long leaderId; + private long term; + private long clusterVersion; + private HashMap stubs; + + State( + long leaderId, + long term, + long clusterVersion, + HashMap stubs) { + this.lock = new ReentrantReadWriteLock(); + this.leaderId = leaderId; + this.term = term; + this.clusterVersion = clusterVersion; + this.stubs = stubs; + } + + long getClusterVersion() { + this.lock.readLock().lock(); + long version = this.clusterVersion; + this.lock.readLock().unlock(); + return version; + } + + HashMap getStubs() { + this.lock.readLock().lock(); + HashMap stubs = this.stubs; + this.lock.readLock().unlock(); + return stubs; + } + + long getLeader() { + this.lock.readLock().lock(); + long res = this.leaderId; + this.lock.readLock().unlock(); + return res; + } + + /** + * Get leader stub + * + * @return Leader stub or null if there are some disjoints in local stubs. + */ + @Nullable + VertxProtocolGrpc.ProtocolVertxStub getLeaderStub() { + this.lock.readLock().lock(); + VertxProtocolGrpc.ProtocolVertxStub stub = this.stubs.get(this.leaderId); + this.lock.readLock().unlock(); + return stub; + } + + void checkUpdate(FetchClusterResponse res) { + try { + this.lock.writeLock().lock(); + if (res.getTerm() < this.term) { + return; + } + if (res.hasLeaderId() && this.term < res.getTerm()) { + this.term = res.getTerm(); + this.leaderId = res.getLeaderId(); + logger().info("client term updates to {}", this.term); + logger().info("client leader id updates to {}", this.leaderId); + } + if (res.getClusterVersion() == this.clusterVersion) { + return; + } + this.clusterVersion = res.getClusterVersion(); + HashMap stubs = new HashMap<>(); + for (Member member : res.getMembersList()) { + String target = + member.getAddrsList().stream() + .map(URI::create) + .map(ProtocolClientImpl::getEndpoint) + .distinct() + .collect(Collectors.joining(",")); + String authority = connectionManager().builder().authority(); + String ips = + String.format( + "%s://%s/%s", + IPNameResolver.SCHEME, + authority != null ? authority : "", + target); + + ManagedChannel channel = connectionManager().defaultChannelBuilder(ips).build(); + VertxProtocolGrpc.ProtocolVertxStub stub = + connectionManager().newStub(VertxProtocolGrpc::newVertxStub, channel); + stubs.put(member.getId(), stub); + } + // TODO: do NOT drop the old stubs, instead modify the stubs (use ConcurrentHashMap) + if (!stubs.isEmpty()) { + this.stubs = stubs; + } + } finally { + this.lock.writeLock().unlock(); + } + } + } + + static String getEndpoint(URI uri) { + return uri.getHost() + (uri.getPort() != -1 ? ":" + uri.getPort() : ""); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/kv/CompactResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/kv/CompactResponse.java new file mode 100644 index 0000000..1a34df6 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/kv/CompactResponse.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.kv; + +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.CompactionResponse; +import com.xline.protobuf.SyncResponse; + +public class CompactResponse extends AbstractResponse { + + public CompactResponse(CompactionResponse response) { + super(response, response.getHeader()); + } + + public CompactResponse(CommandResponse sr, SyncResponse asr) { + super(sr, asr, CommandResponse::getCompactionResponse, CompactionResponse::getHeader); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/kv/DeleteResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/kv/DeleteResponse.java new file mode 100644 index 0000000..63ec063 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/kv/DeleteResponse.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.kv; + +import cloud.xline.jxline.KeyValue; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.DeleteRangeResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +import java.util.List; +import java.util.stream.Collectors; + +public class DeleteResponse extends AbstractResponse { + + private final ByteSequence namespace; + + private List prevKvs; + + public DeleteResponse(DeleteRangeResponse deleteRangeResponse, ByteSequence namespace) { + super(deleteRangeResponse, deleteRangeResponse.getHeader()); + this.namespace = namespace; + } + + public DeleteResponse(CommandResponse sr, SyncResponse asr, ByteSequence namespace) { + super(sr, asr, CommandResponse::getDeleteRangeResponse, DeleteRangeResponse::getHeader); + this.namespace = namespace; + } + + /** + * Returns the number of keys deleted by the delete range request. + * + * @return number of deleted items. + */ + public long getDeleted() { + return getResponse().getDeleted(); + } + + /** + * Returns previous key-value pairs. + * + * @return previous kv, + */ + public synchronized List getPrevKvs() { + if (prevKvs == null) { + prevKvs = + getResponse().getPrevKvsList().stream() + .map(kv -> new KeyValue(kv, namespace)) + .collect(Collectors.toList()); + } + + return prevKvs; + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/kv/GetResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/kv/GetResponse.java new file mode 100644 index 0000000..593bbd5 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/kv/GetResponse.java @@ -0,0 +1,78 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.kv; + +import cloud.xline.jxline.KeyValue; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.RangeResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +import java.util.List; +import java.util.stream.Collectors; + +public class GetResponse extends AbstractResponse { + + private final ByteSequence namespace; + + private List kvs; + + public GetResponse(RangeResponse rangeResponse, ByteSequence namespace) { + super(rangeResponse, rangeResponse.getHeader()); + this.namespace = namespace; + } + + public GetResponse(CommandResponse sr, SyncResponse asr, ByteSequence namespace) { + super(sr, asr, CommandResponse::getRangeResponse, RangeResponse::getHeader); + this.namespace = namespace; + } + + /** + * Returns a list of key-value pairs matched by the range request. + * + * @return kvs. + */ + public synchronized List getKvs() { + if (kvs == null) { + kvs = + getResponse().getKvsList().stream() + .map(kv -> new KeyValue(kv, namespace)) + .collect(Collectors.toList()); + } + + return kvs; + } + + /** + * Returns if there are more keys to return in the requested range. + * + * @return more. + */ + public boolean isMore() { + return getResponse().getMore(); + } + + /** + * Returns the number of keys within the range when requested. + * + * @return count. + */ + public long getCount() { + return getResponse().getCount(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/kv/PutResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/kv/PutResponse.java new file mode 100644 index 0000000..2d62779 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/kv/PutResponse.java @@ -0,0 +1,56 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.kv; + +import cloud.xline.jxline.KeyValue; +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +public class PutResponse extends AbstractResponse { + + private final ByteSequence namespace; + + public PutResponse(CommandResponse sr, SyncResponse asr, ByteSequence namespace) { + super(sr, asr, CommandResponse::getPutResponse, com.xline.protobuf.PutResponse::getHeader); + this.namespace = namespace; + } + + public PutResponse(com.xline.protobuf.PutResponse putResponse, ByteSequence namespace) { + super(putResponse, putResponse.getHeader()); + this.namespace = namespace; + } + + /** + * Returns previous key-value pair. + * + * @return prev kv. + */ + public KeyValue getPrevKv() { + return new KeyValue(getResponse().getPrevKv(), namespace); + } + + /** + * Returns whether a previous key-value pair is present. + * + * @return if has prev kv. + */ + public boolean hasPrevKv() { + return getResponse().hasPrevKv(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/kv/TxnResponse.java b/jxline-core/src/main/java/cloud/xline/jxline/kv/TxnResponse.java new file mode 100644 index 0000000..486ead8 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/kv/TxnResponse.java @@ -0,0 +1,143 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.kv; + +import static com.xline.protobuf.ResponseOp.ResponseCase.RESPONSE_DELETE_RANGE; +import static com.xline.protobuf.ResponseOp.ResponseCase.RESPONSE_PUT; +import static com.xline.protobuf.ResponseOp.ResponseCase.RESPONSE_RANGE; +import static com.xline.protobuf.ResponseOp.ResponseCase.RESPONSE_TXN; + +import cloud.xline.jxline.impl.AbstractResponse; +import com.xline.protobuf.CommandResponse; +import com.xline.protobuf.SyncResponse; +import io.etcd.jetcd.ByteSequence; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * TxnResponse returned by a transaction call contains lists of put, get, delete responses + * corresponding to either the compare in txn.IF is evaluated to true or false. + */ +public class TxnResponse extends AbstractResponse { + + private final ByteSequence namespace; + + private List putResponses; + private List getResponses; + private List deleteResponses; + private List txnResponses; + + public TxnResponse(CommandResponse sr, SyncResponse asr, ByteSequence namespace) { + super(sr, asr, CommandResponse::getTxnResponse, com.xline.protobuf.TxnResponse::getHeader); + this.namespace = namespace; + } + + public TxnResponse(com.xline.protobuf.TxnResponse txnResponse, ByteSequence namespace) { + super(txnResponse, txnResponse.getHeader()); + this.namespace = namespace; + } + + /** + * Returns true if the compare evaluated to true or false otherwise. + * + * @return if succeeded. + */ + public boolean isSucceeded() { + return getResponse().getSucceeded(); + } + + /** + * Returns a list of DeleteResponse; empty list if none. + * + * @return delete responses. + */ + public synchronized List getDeleteResponses() { + if (deleteResponses == null) { + deleteResponses = + getResponse().getResponsesList().stream() + .filter( + (responseOp) -> + responseOp.getResponseCase() == RESPONSE_DELETE_RANGE) + .map( + responseOp -> + new DeleteResponse( + responseOp.getResponseDeleteRange(), namespace)) + .collect(Collectors.toList()); + } + + return deleteResponses; + } + + /** + * Returns a list of GetResponse; empty list if none. + * + * @return get responses. + */ + public synchronized List getGetResponses() { + if (getResponses == null) { + getResponses = + getResponse().getResponsesList().stream() + .filter((responseOp) -> responseOp.getResponseCase() == RESPONSE_RANGE) + .map( + responseOp -> + new GetResponse( + responseOp.getResponseRange(), namespace)) + .collect(Collectors.toList()); + } + + return getResponses; + } + + /** + * Returns a list of PutResponse; empty list if none. + * + * @return put responses. + */ + public synchronized List getPutResponses() { + if (putResponses == null) { + putResponses = + getResponse().getResponsesList().stream() + .filter((responseOp) -> responseOp.getResponseCase() == RESPONSE_PUT) + .map( + responseOp -> + new PutResponse(responseOp.getResponsePut(), namespace)) + .collect(Collectors.toList()); + } + + return putResponses; + } + + /** + * Returns a list of TxnResponse; empty list if none. + * + * @return txn responses. + */ + public synchronized List getTxnResponses() { + if (txnResponses == null) { + txnResponses = + getResponse().getResponsesList().stream() + .filter((responseOp) -> responseOp.getResponseCase() == RESPONSE_TXN) + .map( + responseOp -> + new TxnResponse(responseOp.getResponseTxn(), namespace)) + .collect(Collectors.toList()); + } + + return txnResponses; + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java new file mode 100644 index 0000000..31fd569 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.op; + +import cloud.xline.jxline.Txn; +import com.google.protobuf.ByteString; +import com.xline.protobuf.Compare; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.support.Util; + + +/** The compare predicate in {@link Txn}. */ +public class Cmp { + + public enum Op { + EQUAL, + GREATER, + LESS, + NOT_EQUAL + } + + private final ByteString key; + private final Op op; + private final CmpTarget target; + + public Cmp(ByteSequence key, Op compareOp, CmpTarget target) { + this.key = ByteString.copyFrom(key.getBytes()); + this.op = compareOp; + this.target = target; + } + + Compare toCompare(ByteSequence namespace) { + Compare.Builder compareBuilder = + Compare.newBuilder().setKey(Util.prefixNamespace(this.key, namespace)); + switch (this.op) { + case EQUAL: + compareBuilder.setResult(Compare.CompareResult.EQUAL); + break; + case GREATER: + compareBuilder.setResult(Compare.CompareResult.GREATER); + break; + case LESS: + compareBuilder.setResult(Compare.CompareResult.LESS); + break; + case NOT_EQUAL: + compareBuilder.setResult(Compare.CompareResult.NOT_EQUAL); + break; + default: + throw new IllegalArgumentException("Unexpected compare type (" + this.op + ")"); + } + + Compare.CompareTarget target = this.target.getTarget(); + Object value = this.target.getTargetValue(); + + compareBuilder.setTarget(target); + switch (target) { + case VERSION: + compareBuilder.setVersion((Long) value); + break; + case VALUE: + compareBuilder.setValue((ByteString) value); + break; + case MOD: + compareBuilder.setModRevision((Long) value); + break; + case CREATE: + compareBuilder.setCreateRevision((Long) value); + break; + default: + throw new IllegalArgumentException("Unexpected target type (" + target + ")"); + } + + return compareBuilder.build(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java new file mode 100644 index 0000000..170caf2 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java @@ -0,0 +1,123 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.op; + +import cloud.xline.jxline.Txn; +import com.google.protobuf.ByteString; +import com.xline.protobuf.Compare; +import io.etcd.jetcd.ByteSequence; + +/** + * Cmp target used in {@link Txn}. + */ +public abstract class CmpTarget { + + /** + * Cmp on a given version. + * + * @param version version to compare + * @return the version compare target + */ + public static VersionCmpTarget version(long version) { + return new VersionCmpTarget(version); + } + + /** + * Cmp on the create revision. + * + * @param revision the create revision + * @return the create revision compare target + */ + public static CreateRevisionCmpTarget createRevision(long revision) { + return new CreateRevisionCmpTarget(revision); + } + + /** + * Cmp on the modification revision. + * + * @param revision the modification revision + * @return the modification revision compare target + */ + public static ModRevisionCmpTarget modRevision(long revision) { + return new ModRevisionCmpTarget(revision); + } + + /** + * Cmp on the value. + * + * @param value the value to compare + * @return the value compare target + */ + public static ValueCmpTarget value(ByteSequence value) { + return new ValueCmpTarget(ByteString.copyFrom(value.getBytes())); + } + + private final Compare.CompareTarget target; + private final T targetValue; + + protected CmpTarget(Compare.CompareTarget target, T targetValue) { + this.target = target; + this.targetValue = targetValue; + } + + /** + * Get the compare target used for this compare. + * + * @return the compare target used for this compare + */ + public Compare.CompareTarget getTarget() { + return target; + } + + /** + * Get the compare target value of this compare. + * + * @return the compare target value of this compare. + */ + public T getTargetValue() { + return targetValue; + } + + public static final class VersionCmpTarget extends CmpTarget { + + VersionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.VERSION, targetValue); + } + } + + public static final class CreateRevisionCmpTarget extends CmpTarget { + + CreateRevisionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.CREATE, targetValue); + } + } + + public static final class ModRevisionCmpTarget extends CmpTarget { + + ModRevisionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.MOD, targetValue); + } + } + + public static final class ValueCmpTarget extends CmpTarget { + + ValueCmpTarget(ByteString targetValue) { + super(Compare.CompareTarget.VALUE, targetValue); + } + } + +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java new file mode 100644 index 0000000..8117923 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java @@ -0,0 +1,167 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.op; + +import cloud.xline.jxline.support.Requests; +import com.google.protobuf.ByteString; +import com.xline.protobuf.DeleteRangeRequest; +import com.xline.protobuf.RequestOp; +import com.xline.protobuf.TxnRequest; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import io.etcd.jetcd.support.Util; + +/** Copied From Etcd Operation. */ +public abstract class Op { + + /** Operation type. */ + public enum Type { + PUT, + RANGE, + DELETE_RANGE, + TXN + } + + protected final Type type; + protected final ByteString key; + + protected Op(Type type, ByteString key) { + this.type = type; + this.key = key; + } + + abstract RequestOp toRequestOp(ByteSequence namespace); + + public static PutOp put(ByteSequence key, ByteSequence value, PutOption option) { + return new PutOp( + ByteString.copyFrom(key.getBytes()), ByteString.copyFrom(value.getBytes()), option); + } + + public static GetOp get(ByteSequence key, GetOption option) { + return new GetOp(ByteString.copyFrom(key.getBytes()), option); + } + + public static DeleteOp delete(ByteSequence key, DeleteOption option) { + return new DeleteOp(ByteString.copyFrom(key.getBytes()), option); + } + + public static TxnOp txn(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + return new TxnOp(cmps, thenOps, elseOps); + } + + public static final class PutOp extends Op { + + private final ByteString value; + private final PutOption option; + + private PutOp(ByteString key, ByteString value, PutOption option) { + super(Type.PUT, key); + this.value = value; + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestPut( + Requests.mapPutRequest( + ByteSequence.from(key), + ByteSequence.from(value), + option, + namespace)) + .build(); + } + } + + public static final class GetOp extends Op { + + private final GetOption option; + + private GetOp(ByteString key, GetOption option) { + super(Type.RANGE, key); + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestRange( + Requests.mapRangeRequest(ByteSequence.from(key), option, namespace)) + .build(); + } + } + + public static final class DeleteOp extends Op { + + private final DeleteOption option; + + DeleteOp(ByteString key, DeleteOption option) { + super(Type.DELETE_RANGE, key); + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestDeleteRange( + DeleteRangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setPrevKv(option.isPrevKV())) + .build(); + } + } + + public static final class TxnOp extends Op { + private final Cmp[] cmps; + private final Op[] thenOps; + private final Op[] elseOps; + + private TxnOp(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + super(Type.TXN, null); + this.cmps = cmps; + this.thenOps = thenOps; + this.elseOps = elseOps; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + TxnRequest.Builder txn = TxnRequest.newBuilder(); + + if (cmps != null) { + for (Cmp cmp : cmps) { + txn.addCompare(cmp.toCompare(namespace)); + } + } + + if (thenOps != null) { + for (Op thenOp : thenOps) { + txn.addSuccess(thenOp.toRequestOp(namespace)); + } + } + + if (elseOps != null) { + for (Op elseOp : elseOps) { + txn.addFailure(elseOp.toRequestOp(namespace)); + } + } + + return RequestOp.newBuilder().setRequestTxn(txn).build(); + } + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java new file mode 100644 index 0000000..f84d241 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java @@ -0,0 +1,161 @@ +package cloud.xline.jxline.op; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Stream; + +import cloud.xline.jxline.Txn; +import cloud.xline.jxline.kv.TxnResponse; +import com.xline.protobuf.*; +import io.etcd.jetcd.ByteSequence; + +import com.google.common.annotations.VisibleForTesting; + +/** Build a transaction. */ +public class TxnImpl implements Txn { + + public static TxnImpl newTxn( + ByteSequence namespace, Function> f) { + return new TxnImpl(namespace, f); + } + + @VisibleForTesting + static TxnImpl newTxn(Function> f) { + return newTxn(ByteSequence.EMPTY, f); + } + + private final ByteSequence namespace; + + private final List cmpList = new ArrayList<>(); + private final List successOpList = new ArrayList<>(); + private final List failureOpList = new ArrayList<>(); + private final Function> requestF; + + private boolean seenThen = false; + private boolean seenElse = false; + + private TxnImpl(ByteSequence namespace, Function> f) { + this.requestF = f; + this.namespace = namespace; + } + + @Override + public TxnImpl If(Cmp... cmps) { + return If(Arrays.asList(cmps)); + } + + TxnImpl If(List cmps) { + if (this.seenThen) { + throw new IllegalArgumentException("cannot call If after Then!"); + } + if (this.seenElse) { + throw new IllegalArgumentException("cannot call If after Else!"); + } + + cmpList.addAll(cmps); + return this; + } + + @Override + public TxnImpl Then(Op... ops) { + return Then(Arrays.asList(ops)); + } + + TxnImpl Then(List ops) { + if (this.seenElse) { + throw new IllegalArgumentException("cannot call Then after Else!"); + } + + this.seenThen = true; + + successOpList.addAll(ops); + return this; + } + + @Override + public TxnImpl Else(Op... ops) { + return Else(Arrays.asList(ops)); + } + + TxnImpl Else(List ops) { + this.seenElse = true; + + failureOpList.addAll(ops); + return this; + } + + @Override + public CompletableFuture commit() { + return this.requestF.apply(this.toTxnRequest()); + } + + private Command toTxnRequest() { + TxnRequest.Builder requestBuilder = TxnRequest.newBuilder(); + + for (Cmp c : this.cmpList) { + requestBuilder.addCompare(c.toCompare(namespace)); + } + + for (Op o : this.successOpList) { + requestBuilder.addSuccess(o.toRequestOp(namespace)); + } + + for (Op o : this.failureOpList) { + requestBuilder.addFailure(o.toRequestOp(namespace)); + } + + TxnRequest txnReq = requestBuilder.build(); + + return Command.newBuilder() + .addAllKeys(getTxnReqKeyRanges(txnReq)) + .setRequest(RequestWithToken.newBuilder().setTxnRequest(txnReq).build()) + .build(); + } + + private static List getTxnReqKeyRanges(TxnRequest req) { + List keyRanges = new ArrayList<>(); + req.getCompareList() + .forEach( + cmp -> + keyRanges.add( + KeyRange.newBuilder() + .setKey(cmp.getKey()) + .setRangeEnd(cmp.getRangeEnd()) + .build())); + Stream.concat(req.getSuccessList().stream(), req.getFailureList().stream()) + .forEach( + op -> { + if (op.hasRequestRange()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestRange().getKey()) + .setRangeEnd(op.getRequestRange().getRangeEnd()) + .build()); + return; + } + if (op.hasRequestPut()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestPut().getKey()) + .build()); + return; + } + if (op.hasRequestDeleteRange()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestDeleteRange().getKey()) + .setRangeEnd( + op.getRequestDeleteRange().getRangeEnd()) + .build()); + return; + } + if (op.hasRequestTxn()) { + keyRanges.addAll(getTxnReqKeyRanges(op.getRequestTxn())); + } + }); + return keyRanges; + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java new file mode 100644 index 0000000..bc8b8b2 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java @@ -0,0 +1,170 @@ +package cloud.xline.jxline.support; + +import com.google.protobuf.ByteString; +import com.xline.protobuf.*; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.*; +import io.etcd.jetcd.support.Util; + +import java.util.Optional; +import java.util.function.Consumer; + +public final class Requests { + + public static PutRequest mapPutRequest( + ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { + return PutRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setValue(ByteString.copyFrom(value.getBytes())) + .setLease(option.getLeaseId()) + .setPrevKv(option.getPrevKV()) + .build(); + } + + public static Command mapPutCommand( + ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { + PutRequest req = mapPutRequest(key, value, option, namespace); + return Command.newBuilder() + .addKeys(KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace))) + .setRequest( + RequestWithToken.newBuilder().setPutRequest(req).build()) // TODO: add token + .build(); + } + + public static RangeRequest.Builder mapRangeRequest( + ByteSequence key, GetOption option, ByteSequence namespace) { + return RangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setCountOnly(option.isCountOnly()) + .setLimit(option.getLimit()) + .setRevision(option.getRevision()) + .setKeysOnly(option.isKeysOnly()) + .setSerializable(option.isSerializable()) + .setSortOrder(toRangeRequestSortOrder(option.getSortOrder())) + .setSortTarget(toRangeRequestSortTarget(option.getSortField())) + .setMinCreateRevision(option.getMinCreateRevision()) + .setMaxCreateRevision(option.getMaxCreateRevision()) + .setMinModRevision(option.getMinModRevision()) + .setMaxModRevision(option.getMaxModRevision()); + } + + public static Command mapRangeCommand( + ByteSequence key, GetOption option, ByteSequence namespace) { + RangeRequest.Builder builder = mapRangeRequest(key, option, namespace); + KeyRange.Builder keyRange = + KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace)); + + defineRangeRequestEnd( + key, + option.getEndKey(), + option.isPrefix(), + namespace, + endKey -> { + builder.setRangeEnd(endKey); + keyRange.setRangeEnd(endKey); + }); + return Command.newBuilder() + .addKeys(keyRange) + .setRequest( + RequestWithToken.newBuilder() + .setRangeRequest(builder.build()) + .build()) // TODO: add token + .build(); + } + + public static DeleteRangeRequest.Builder mapDeleteRequest( + ByteSequence key, DeleteOption option, ByteSequence namespace) { + return DeleteRangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setPrevKv(option.isPrevKV()); + } + + public static Command mapDeleteCommand( + ByteSequence key, DeleteOption option, ByteSequence namespace) { + DeleteRangeRequest.Builder builder = mapDeleteRequest(key, option, namespace); + KeyRange.Builder keyRange = + KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace)); + + defineRangeRequestEnd( + key, + option.getEndKey(), + option.isPrefix(), + namespace, + endKey -> { + builder.setRangeEnd(endKey); + keyRange.setRangeEnd(endKey); + }); + return Command.newBuilder() + .addKeys(keyRange) + .setRequest( + RequestWithToken.newBuilder() + .setDeleteRangeRequest(builder.build()) + .build()) // TODO: add token + .build(); + } + + public static Command mapCompactRequest(long revision, CompactOption option) { + CompactionRequest req = + CompactionRequest.newBuilder() + .setRevision(revision) + .setPhysical(option.isPhysical()) + .build(); + return Command.newBuilder() + .setRequest( + RequestWithToken.newBuilder() + .setCompactionRequest(req) + .build()) // TODO: add token + .build(); + } + + private static RangeRequest.SortOrder toRangeRequestSortOrder(GetOption.SortOrder order) { + switch (order) { + case NONE: + return RangeRequest.SortOrder.NONE; + case ASCEND: + return RangeRequest.SortOrder.ASCEND; + case DESCEND: + return RangeRequest.SortOrder.DESCEND; + default: + return RangeRequest.SortOrder.UNRECOGNIZED; + } + } + + public static RangeRequest.SortTarget toRangeRequestSortTarget(GetOption.SortTarget target) { + switch (target) { + case KEY: + return RangeRequest.SortTarget.KEY; + case CREATE: + return RangeRequest.SortTarget.CREATE; + case MOD: + return RangeRequest.SortTarget.MOD; + case VALUE: + return RangeRequest.SortTarget.VALUE; + case VERSION: + return RangeRequest.SortTarget.VERSION; + default: + return RangeRequest.SortTarget.UNRECOGNIZED; + } + } + + private static void defineRangeRequestEnd( + ByteSequence key, + Optional endKeyOptional, + boolean hasPrefix, + ByteSequence namespace, + Consumer setRangeEndConsumer) { + + if (endKeyOptional.isPresent()) { + setRangeEndConsumer.accept( + Util.prefixNamespaceToRangeEnd( + ByteString.copyFrom(endKeyOptional.get().getBytes()), namespace)); + } else { + if (hasPrefix) { + ByteSequence endKey = OptionsUtil.prefixEndOf(key); + setRangeEndConsumer.accept( + Util.prefixNamespaceToRangeEnd( + ByteString.copyFrom(endKey.getBytes()), namespace)); + } + } + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/utils/Invoke.java b/jxline-core/src/main/java/cloud/xline/jxline/utils/Invoke.java new file mode 100644 index 0000000..b96e56f --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/utils/Invoke.java @@ -0,0 +1,14 @@ +package cloud.xline.jxline.utils; + +import javax.annotation.Nonnull; + +/** + * Like {@link java.util.function.Function}, but throws Exceptions + * + * @param + * @param + */ +@FunctionalInterface +public interface Invoke { + V call(@Nonnull R r) throws Exception; +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/utils/Pair.java b/jxline-core/src/main/java/cloud/xline/jxline/utils/Pair.java index 457227d..07e0fc2 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/utils/Pair.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/utils/Pair.java @@ -1,12 +1,13 @@ package cloud.xline.jxline.utils; import java.util.Objects; +import java.util.function.BiFunction; public final class Pair { final A a; final B b; - Pair(A a, B b) { + public Pair(A a, B b) { this.a = Objects.requireNonNull(a); this.b = Objects.requireNonNull(b); } @@ -30,6 +31,10 @@ public boolean equals(Object o) { return false; } + public T apply(BiFunction fn) { + return fn.apply(a, b); + } + @Override public int hashCode() { return a.hashCode() ^ b.hashCode(); diff --git a/jxline-core/src/test/java/AuthTest.java b/jxline-core/src/test/java/AuthTest.java new file mode 100644 index 0000000..3e0c035 --- /dev/null +++ b/jxline-core/src/test/java/AuthTest.java @@ -0,0 +1,152 @@ +import cloud.xline.jxline.Auth; +import cloud.xline.jxline.Client; +import cloud.xline.jxline.ClientBuilder; +import cloud.xline.jxline.KV; +import cloud.xline.jxline.auth.AuthRoleListResponse; +import cloud.xline.jxline.auth.Permission; +import io.etcd.jetcd.ByteSequence; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.List; + +import static utils.Utils.*; +import static org.assertj.core.api.Assertions.*; + +@Timeout(value = 20) +public class AuthTest { + private static ClientBuilder clientBuilder; + + private static final String INIT_ENDPOINT = "http://127.0.0.1:2379"; + + private static final String rootString = "root"; + private static final ByteSequence rootPass = bytesOf("123"); + private static final String rootRoleString = "root"; + private static final String userString = "user"; + private static final String userRoleString = "userRole"; + private static Auth authDisabledAuthClient; + private static KV authDisabledKVClient; + private final ByteSequence rootRoleKey = bytesOf("root"); + private final ByteSequence rootRoleValue = bytesOf("b"); + private final ByteSequence rootRoleKeyRangeBegin = bytesOf("root"); + private final ByteSequence rootRoleKeyRangeEnd = bytesOf("root1"); + private final ByteSequence userRoleKey = bytesOf("foo"); + private final ByteSequence userRoleValue = bytesOf("bar"); + private final ByteSequence userRoleKeyRangeBegin = bytesOf("foo"); + private final ByteSequence userRoleKeyRangeEnd = bytesOf("foo1"); + private final ByteSequence root = bytesOf(rootString); + private final ByteSequence rootRole = bytesOf(rootRoleString); + private final ByteSequence user = bytesOf(userString); + private final ByteSequence userPass = bytesOf("userPass"); + private final ByteSequence userNewPass = bytesOf("newUserPass"); + private final ByteSequence userRole = bytesOf(userRoleString); + + @BeforeAll + static void onConnect() { + clientBuilder = Client.builder().endpoints(INIT_ENDPOINT); + Client clientNoAuth = clientBuilder.build(); + authDisabledKVClient = clientNoAuth.getKVClient(); + authDisabledAuthClient = clientNoAuth.getAuthClient(); + } + + @Test + public void testAuth() throws Exception { + authDisabledAuthClient.roleAdd(rootRole).get(); + authDisabledAuthClient.roleAdd(userRole).get(); + + final AuthRoleListResponse response = authDisabledAuthClient.roleList().get(); + assertThat(response.getRoles()).containsOnly(rootRoleString, userRoleString); + + authDisabledAuthClient + .roleGrantPermission( + rootRole, + rootRoleKeyRangeBegin, + rootRoleKeyRangeEnd, + Permission.Type.READWRITE) + .get(); + authDisabledAuthClient + .roleGrantPermission( + userRole, + userRoleKeyRangeBegin, + userRoleKeyRangeEnd, + Permission.Type.READWRITE) + .get(); + + authDisabledAuthClient.userAdd(root, rootPass).get(); + authDisabledAuthClient.userAdd(user, userPass).get(); + + authDisabledAuthClient.userChangePassword(user, userNewPass).get(); + + List users = authDisabledAuthClient.userList().get().getUsers(); + assertThat(users).containsOnly(rootString, userString); + + authDisabledAuthClient.userGrantRole(root, rootRole).get(); + authDisabledAuthClient.userGrantRole(user, rootRole).get(); + authDisabledAuthClient.userGrantRole(user, userRole).get(); + + assertThat(authDisabledAuthClient.userGet(root).get().getRoles()) + .containsOnly(rootRoleString); + assertThat(authDisabledAuthClient.userGet(user).get().getRoles()) + .containsOnly(rootRoleString, userRoleString); + + // TODO: + // There are significant changes to the auth in version 0.7.0, and currently it does + // not support the authed client implementation of version 0.6.1. + // The current modification should support version 0.7.0. + + // authDisabledAuthClient.authEnable().get(); + // + // final Client userClient = clientBuilder.user(user).password(userNewPass).build(); + // final Client rootClient = clientBuilder.user(root).password(rootPass).build(); + // + // userClient.getKVClient().put(rootRoleKey, rootRoleValue).get(); + // userClient.getKVClient().put(userRoleKey, userRoleValue).get(); + // userClient.getKVClient().get(rootRoleKey).get(); + // userClient.getKVClient().get(userRoleKey).get(); + // + // assertThatThrownBy(() -> authDisabledKVClient.put(rootRoleKey, + // rootRoleValue).get()) + // .hasMessageContaining("etcdserver: user name is empty"); + // assertThatThrownBy(() -> authDisabledKVClient.put(userRoleKey, + // rootRoleValue).get()) + // .hasMessageContaining("etcdserver: user name is empty"); + // assertThatThrownBy(() -> authDisabledKVClient.get(rootRoleKey).get()) + // .hasMessageContaining("etcdserver: user name is empty"); + // assertThatThrownBy(() -> authDisabledKVClient.get(userRoleKey).get()) + // .hasMessageContaining("etcdserver: user name is empty"); + // + // AuthRoleGetResponse roleGetResponse = + // userClient.getAuthClient().roleGet(rootRole).get(); + // assertThat(roleGetResponse.getPermissions().size()).isNotEqualTo(0); + // + // roleGetResponse = userClient.getAuthClient().roleGet(userRole).get(); + // assertThat(roleGetResponse.getPermissions().size()).isNotEqualTo(0); + // + // rootClient.getAuthClient().userRevokeRole(user, rootRole).get(); + // + // final KV kvClient = userClient.getKVClient(); + // // verify the access to root role is revoked for user. + // assertThatThrownBy(() -> kvClient.get(rootRoleKey).get()).isNotNull(); + // // verify userRole is still valid. + // assertThat(kvClient.get(userRoleKey).get().getCount()).isNotEqualTo(0); + // + // rootClient + // .getAuthClient() + // .roleRevokePermission(userRole, userRoleKeyRangeBegin, + // userRoleKeyRangeEnd) + // .get(); + // + // // verify the access to foo is revoked for user. + // assertThatThrownBy(() -> + // userClient.getKVClient().get(userRoleKey).get()).isNotNull(); + // + // rootClient.getAuthClient().authDisable().get(); + // + // authDisabledAuthClient.userDelete(root).get(); + // authDisabledAuthClient.userDelete(user).get(); + // + // authDisabledAuthClient.roleDelete(rootRole).get(); + // authDisabledAuthClient.roleDelete(userRole).get(); + } +} diff --git a/jxline-core/src/test/java/KVTest.java b/jxline-core/src/test/java/KVTest.java new file mode 100644 index 0000000..875054f --- /dev/null +++ b/jxline-core/src/test/java/KVTest.java @@ -0,0 +1,229 @@ +import cloud.xline.jxline.Client; +import cloud.xline.jxline.KV; +import cloud.xline.jxline.Txn; +import cloud.xline.jxline.kv.DeleteResponse; +import cloud.xline.jxline.kv.GetResponse; +import cloud.xline.jxline.kv.PutResponse; +import cloud.xline.jxline.kv.TxnResponse; +import cloud.xline.jxline.op.Cmp; +import cloud.xline.jxline.op.CmpTarget; +import cloud.xline.jxline.op.Op; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import static org.assertj.core.api.Assertions.*; +import static utils.Utils.*; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +@Timeout(value = 20) +public class KVTest { + private static KV kvClient; + + private static final ByteSequence SAMPLE_KEY = bytesOf("sample_key"); + private static final ByteSequence SAMPLE_VALUE = bytesOf("sample_value"); + private static final ByteSequence SAMPLE_KEY_2 = bytesOf("sample_key2"); + private static final ByteSequence SAMPLE_VALUE_2 = bytesOf("sample_value2"); + private static final ByteSequence SAMPLE_KEY_3 = bytesOf("sample_key3"); + + private static final String INIT_ENDPOINT = "http://127.0.0.1:2379"; + + @BeforeAll + static void onConnect() { + kvClient = Client.builder().endpoints(INIT_ENDPOINT).build().getKVClient(); + } + + @Test + public void testItWorks() throws Exception { + ByteSequence key = ByteSequence.from("Hello Xline", Charset.defaultCharset()); + ByteSequence value = ByteSequence.from("Hi", Charset.defaultCharset()); + PutResponse putResponse = kvClient.put(key, value).get(); + assertThat(putResponse).isNotNull(); + GetResponse getResponse = kvClient.get(key).get(); + assertThat(getResponse).isNotNull(); + assertThat(getResponse.getCount()).isEqualTo(1); + assertThat(getResponse.getKvs().get(0).getValue()).isEqualTo(value); + } + + @Test + public void testByteSequence() { + ByteSequence prefix = bytesOf("/test-service/"); + ByteSequence subPrefix = bytesOf("uuids/"); + + String keyString = randomString(); + ByteSequence key = bytesOf(keyString); + ByteSequence prefixedKey = prefix.concat(subPrefix).concat(key); + assertThat(prefixedKey.startsWith(prefix)).isTrue(); + assertThat( + prefixedKey + .substring(prefix.size() + subPrefix.size()) + .toString(StandardCharsets.UTF_8)) + .isEqualTo(keyString); + assertThat(prefixedKey.substring(prefix.size(), prefix.size() + subPrefix.size())) + .isEqualTo(subPrefix); + } + + @Test + public void testPut() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY, SAMPLE_VALUE); + PutResponse response = feature.get(); + assertThat(response.getHeader()).isNotNull(); + assertThat(!response.hasPrevKv()).isTrue(); + } + + @Test + public void testGet() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY_2, SAMPLE_VALUE_2); + feature.get(); + CompletableFuture getFeature = kvClient.get(SAMPLE_KEY_2); + GetResponse response = getFeature.get(); + assertThat(response.getKvs()).hasSize(1); + assertThat(response.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(SAMPLE_VALUE_2.toString(StandardCharsets.UTF_8)); + assertThat(!response.isMore()).isTrue(); + } + + @Test + public void testGetWithRev() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY_3, SAMPLE_VALUE); + PutResponse putResp = feature.get(); + kvClient.put(SAMPLE_KEY_3, SAMPLE_VALUE_2).get(); + GetOption option = + GetOption.builder().withRevision(putResp.getHeader().getRevision()).build(); + CompletableFuture getFeature = kvClient.get(SAMPLE_KEY_3, option); + GetResponse response = getFeature.get(); + assertThat(response.getKvs()).hasSize(1); + assertThat(response.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(SAMPLE_VALUE.toString(StandardCharsets.UTF_8)); + } + + @Test + public void testDelete() throws Exception { + // Put content so that we actually have something to delete + testPut(); + + ByteSequence keyToDelete = SAMPLE_KEY; + + // count keys about to delete + CompletableFuture getFeature = kvClient.get(keyToDelete); + GetResponse resp = getFeature.get(); + + // delete the keys + CompletableFuture deleteFuture = kvClient.delete(keyToDelete); + DeleteResponse delResp = deleteFuture.get(); + assertThat(delResp.getDeleted()).isEqualTo(resp.getKvs().size()); + } + + @Test + public void testGetSortedPrefix() throws Exception { + String prefix = randomString(); + int numPrefix = 3; + putKeysWithPrefix(prefix, numPrefix); + + GetOption option = + GetOption.builder() + .withSortField(GetOption.SortTarget.KEY) + .withSortOrder(GetOption.SortOrder.DESCEND) + .isPrefix(true) + .build(); + CompletableFuture getFeature = kvClient.get(bytesOf(prefix), option); + GetResponse response = getFeature.get(); + + assertThat(response.getKvs()).hasSize(numPrefix); + for (int i = 0; i < numPrefix; i++) { + assertThat(response.getKvs().get(i).getKey().toString(StandardCharsets.UTF_8)) + .isEqualTo(prefix + (numPrefix - i - 1)); + assertThat(response.getKvs().get(i).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(String.valueOf(numPrefix - i - 1)); + } + } + + @Test + public void testGetAndDeleteWithPrefix() throws Exception { + String prefix = randomString(); + ByteSequence key = bytesOf(prefix); + int numPrefixes = 10; + + putKeysWithPrefix(prefix, numPrefixes); + + // verify get withPrefix. + CompletableFuture getFuture = + kvClient.get(key, GetOption.builder().isPrefix(true).build()); + GetResponse getResp = getFuture.get(); + assertThat(getResp.getCount()).isEqualTo(numPrefixes); + + // verify del withPrefix. + DeleteOption deleteOpt = DeleteOption.builder().isPrefix(true).build(); + CompletableFuture delFuture = kvClient.delete(key, deleteOpt); + DeleteResponse delResp = delFuture.get(); + assertThat(delResp.getDeleted()).isEqualTo(numPrefixes); + } + + private static void putKeysWithPrefix(String prefix, int numPrefixes) + throws ExecutionException, InterruptedException { + for (int i = 0; i < numPrefixes; i++) { + ByteSequence key = bytesOf(prefix + i); + ByteSequence value = bytesOf("" + i); + kvClient.put(key, value).get(); + } + } + + @Test + public void testTxn() throws Exception { + ByteSequence sampleKey = bytesOf("txn_key"); + ByteSequence sampleValue = bytesOf("xyz"); + ByteSequence cmpValue = bytesOf("abc"); + ByteSequence putValue = bytesOf("XYZ"); + ByteSequence putValueNew = bytesOf("ABC"); + // put the original txn key value pair + kvClient.put(sampleKey, sampleValue).get(); + + // construct txn operation + Txn txn = kvClient.txn(); + Cmp cmp = new Cmp(sampleKey, Cmp.Op.GREATER, CmpTarget.value(cmpValue)); + CompletableFuture txnResp = + txn.If(cmp) + .Then(Op.put(sampleKey, putValue, PutOption.DEFAULT)) + .Else(Op.put(sampleKey, putValueNew, PutOption.DEFAULT)) + .commit(); + txnResp.get(); + // get the value + GetResponse getResp = kvClient.get(sampleKey).get(); + assertThat(getResp.getKvs()).hasSize(1); + assertThat(getResp.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(putValue.toString(StandardCharsets.UTF_8)); + } + + @Test + public void testTxnForCmpOpNotEqual() throws Exception { + ByteSequence sampleKey = bytesOf("txn_key"); + ByteSequence sampleValue = bytesOf("xyz"); + ByteSequence cmpValue = bytesOf("abc"); + ByteSequence putValue = bytesOf("XYZ"); + ByteSequence putValueNew = bytesOf("ABC"); + // put the original txn key value pair + kvClient.put(sampleKey, sampleValue).get(); + + // construct txn operation + Txn txn = kvClient.txn(); + Cmp cmp = new Cmp(sampleKey, Cmp.Op.NOT_EQUAL, CmpTarget.value(cmpValue)); + CompletableFuture txnResp = + txn.If(cmp) + .Then(Op.put(sampleKey, putValue, PutOption.DEFAULT)) + .Else(Op.put(sampleKey, putValueNew, PutOption.DEFAULT)) + .commit(); + txnResp.get(); + // get the value + GetResponse getResp = kvClient.get(sampleKey).get(); + assertThat(getResp.getKvs()).hasSize(1); + assertThat(getResp.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(putValue.toString(StandardCharsets.UTF_8)); + } +} diff --git a/jxline-core/src/test/java/ProtocolTest.java b/jxline-core/src/test/java/ProtocolTest.java new file mode 100644 index 0000000..ffdaaf3 --- /dev/null +++ b/jxline-core/src/test/java/ProtocolTest.java @@ -0,0 +1,51 @@ +import cloud.xline.jxline.Client; +import cloud.xline.jxline.ProtocolClient; +import com.google.protobuf.ByteString; +import com.xline.protobuf.*; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.assertj.core.api.Assertions.*; + +@Timeout(value = 20) +public class ProtocolTest { + private static ProtocolClient client; + + private static final String INIT_ENDPOINT = "http://127.0.0.1:2379"; + + @BeforeAll + static void onConnect() { + client = Client.builder().endpoints(INIT_ENDPOINT).build().getProtocolClient(); + } + + @Test + void testItWorks() throws Exception { + Command put = + Command.newBuilder() + .setRequest( + RequestWithToken.newBuilder() + .setPutRequest( + PutRequest.newBuilder() + .setKey(ByteString.copyFromUtf8("Hello")) + .setValue(ByteString.copyFromUtf8("Xline")) + .build()) + .build()) + .build(); + PutResponse putResp = client.propose(put, false, (sr, asr) -> sr.getPutResponse()).get(); + assertThat(putResp).isNotNull(); + Command get = + Command.newBuilder() + .setRequest( + RequestWithToken.newBuilder() + .setRangeRequest( + RangeRequest.newBuilder() + .setKey(ByteString.copyFromUtf8("Hello")) + .build())) + .build(); + RangeResponse getResp = client.propose(get, true, (sr, asr) -> sr.getRangeResponse()).get(); + assertThat(getResp).isNotNull(); + assertThat(getResp.getCount()).isEqualTo(1); + assertThat(getResp.getKvs(0).getValue()).isEqualTo(ByteString.copyFromUtf8("Xline")); + } +} diff --git a/jxline-core/src/test/java/utils/Utils.java b/jxline-core/src/test/java/utils/Utils.java new file mode 100644 index 0000000..c89adb0 --- /dev/null +++ b/jxline-core/src/test/java/utils/Utils.java @@ -0,0 +1,16 @@ +package utils; + +import io.etcd.jetcd.ByteSequence; + +import java.nio.charset.StandardCharsets; + +public class Utils { + + public static ByteSequence bytesOf(final String string) { + return ByteSequence.from(string, StandardCharsets.UTF_8); + } + + public static String randomString() { + return java.util.UUID.randomUUID().toString(); + } +} diff --git a/scripts/quick_start.sh b/scripts/quick_start.sh deleted file mode 100755 index f522f54..0000000 --- a/scripts/quick_start.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash -DIR=$( - cd "$(dirname "$0")" - pwd -) -SERVERS=("172.20.0.2" "172.20.0.3" "172.20.0.4" "172.20.0.5") -MEMBERS="node1=${SERVERS[1]}:2379,node2=${SERVERS[2]}:2379,node3=${SERVERS[3]}:2379" - -# run xline node by index -# args: -# $1: index of the node -run_xline() { - cmd="/usr/local/bin/xline \ - --name node${1} \ - --members ${MEMBERS} \ - --storage-engine rocksdb \ - --data-dir /usr/local/xline/data-dir \ - --auth-public-key /mnt/public.pem \ - --auth-private-key /mnt/private.pem" - - if [ ${1} -eq 1 ]; then - cmd="${cmd} --is-leader" - fi - - docker exec -e RUST_LOG=debug -d node${1} ${cmd} - echo "command is: docker exec -e RUST_LOG=debug -d node${1} ${cmd}" -} - -# run cluster of xline/etcd in container -run_cluster() { - echo cluster starting - run_xline 1 & - run_xline 2 & - run_xline 3 & - wait - echo cluster started -} - -# stop all containers -stop_all() { - echo stopping - for name in "node1" "node2" "node3" "node4"; do - docker_id=$(docker ps -qf "name=${name}") - if [ -n "$docker_id" ]; then - docker stop $docker_id - fi - done - sleep 1 - echo stopped -} - -# run container of xline/etcd use specified image -# args: -# $1: size of cluster -run_container() { - echo container starting - size=${1} - image="ghcr.io/xline-kv/xline:latest" - for ((i = 1; i <= ${size}; i++)); do - docker run -d -it --rm --name=node${i} --net=xline_net --ip=${SERVERS[$i]} --cap-add=NET_ADMIN --cpu-shares=1024 -m=512M -v ${DIR}:/mnt ${image} bash & - done - docker run -d -it --rm --name=node4 --net=xline_net --ip=${SERVERS[0]} --cap-add=NET_ADMIN --cpu-shares=1024 -m=512M -v ${DIR}:/mnt gcr.io/etcd-development/etcd:v3.5.5 bash & - wait - echo container started -} - -stop_all -docker network create --subnet=172.20.0.0/24 xline_net >/dev/null 2>&1 - -run_container 3 -run_cluster