Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non blocking client 4 #12

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: implement connection manager
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Feb 27, 2024
commit bd16d2c1d9a91d2c2521df52675540c64727520c
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,25 @@
import io.etcd.jetcd.ByteSequence;
import io.grpc.*;
import io.grpc.netty.NegotiationType;
import io.grpc.stub.AbstractStub;
import io.netty.channel.ChannelOption;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.grpc.VertxChannelBuilder;

import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Function;

final class ClientConnectionManager {
private final Object lock;
private final ClientBuilder builder;
private final ExecutorService executorService;
private volatile Vertx vertx;

/// Integrated channel
private volatile ManagedChannel initChannel;

ClientConnectionManager(ClientBuilder builder) {
this.lock = new Object();
this.builder = builder;
Expand Down Expand Up @@ -144,6 +149,37 @@ public void start(
return channelBuilder;
}

public ManagedChannel getInitChannel() {
if (this.initChannel == null) {
synchronized (lock) {
if (this.initChannel == null) {
this.initChannel = defaultChannelBuilder().build();
}
}
}
return this.initChannel;
}

/**
* Create stub with saved integrated channel. Used to create some xline servers
*
* @param supplier the stub supplier
* @param <T> the type of stub
* @return the attached stub
*/
<T extends AbstractStub<T>> T newStub(Function<ManagedChannel, T> supplier) {
return newStub(supplier, this.getInitChannel());
}

private <T extends AbstractStub<T>> T newStub(
Function<ManagedChannel, T> stubCustomizer, ManagedChannel channel) {
T stub = stubCustomizer.apply(channel);
if (builder.waitForReady()) {
stub = stub.withWaitForReady();
}
return stub;
}

Vertx vertx() {
if (this.vertx == null) {
synchronized (this.lock) {
Expand Down
147 changes: 147 additions & 0 deletions jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package cloud.xline.jxline.impl;

import com.curp.protobuf.*;
import io.grpc.ManagedChannel;

import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

class ProtocolClient extends Impl {

private final State state;

ProtocolClient(ClientConnectionManager connectionManager) {
super(connectionManager);
this.state = getInitState();
}

State getInitState() {
ManagedChannel initChannel = this.connectionManager().getInitChannel();
ProtocolGrpc.ProtocolBlockingStub initStub = ProtocolGrpc.newBlockingStub(initChannel);
FetchClusterResponse response = null;
do {
try {
response =
CompletableFuture.supplyAsync(
() ->
initStub.fetchCluster(
FetchClusterRequest.newBuilder()
.setLinearizable(false)
.build()),
this.connectionManager().getExecutorService())
.get();
} catch (Exception ignore) {
}
} while (response == null || !response.hasLeaderId());

HashMap<Long, VertxProtocolGrpc.ProtocolVertxStub> stubs = new HashMap<>();
for (Member member : response.getMembersList()) {
String target =
StreamSupport.stream(
member.getAddrsList().stream().map(URI::create).spliterator(),
false)
.map(e -> e.getHost() + (e.getPort() != -1 ? ":" + e.getPort() : ""))
.distinct()
.collect(Collectors.joining(","));
ManagedChannel channel = this.connectionManager().defaultChannelBuilder(target).build();
VertxProtocolGrpc.ProtocolVertxStub stub = VertxProtocolGrpc.newVertxStub(channel);
stubs.put(member.getId(), stub);
}

return new State(
response.getLeaderId(), response.getTerm(), response.getClusterVersion(), stubs);
}

private class State {
private final ReadWriteLock lock;
private long leaderId;
private long term;
private long clusterVersion;
private HashMap<Long, VertxProtocolGrpc.ProtocolVertxStub> stubs;

State(
long leaderId,
long term,
long clusterVersion,
HashMap<Long, VertxProtocolGrpc.ProtocolVertxStub> stubs) {
this.lock = new ReentrantReadWriteLock();
this.leaderId = leaderId;
this.term = term;
this.clusterVersion = clusterVersion;
this.stubs = stubs;
}

long getClusterVersion() {
this.lock.readLock().lock();
long version = this.clusterVersion;
this.lock.readLock().unlock();
return version;
}

HashMap<Long, VertxProtocolGrpc.ProtocolVertxStub> getStubs() {
this.lock.readLock().lock();
HashMap<Long, VertxProtocolGrpc.ProtocolVertxStub> stubs = this.stubs;
this.lock.readLock().unlock();
return stubs;
}

long getLeader() {
this.lock.readLock().lock();
long res = this.leaderId;
this.lock.readLock().unlock();
return res;
}

void checkUpdate(FetchClusterResponse res) {
try {
this.lock.writeLock().lock();
if (res.getTerm() < this.term) {
return;
}
if (res.hasLeaderId() && this.term < res.getTerm()) {
this.term = res.getTerm();
this.leaderId = res.getLeaderId();
logger().info("client term updates to " + this.term);
logger().info("client leader id updates to " + this.leaderId);
}
if (res.getClusterVersion() == this.clusterVersion) {
return;
}
this.clusterVersion = res.getClusterVersion();
HashMap<Long, VertxProtocolGrpc.ProtocolVertxStub> stubs = new HashMap<>();
for (Member member : res.getMembersList()) {
String target =
StreamSupport.stream(
member.getAddrsList().stream()
.map(URI::create)
.spliterator(),
false)
.map(
e ->
e.getHost()
+ (e.getPort() != -1
? ":" + e.getPort()
: ""))
.distinct()
.collect(Collectors.joining(","));
ManagedChannel channel =
connectionManager().defaultChannelBuilder(target).build();
VertxProtocolGrpc.ProtocolVertxStub stub =
VertxProtocolGrpc.newVertxStub(channel);
stubs.put(member.getId(), stub);
}
// TODO: do NOT drop the old stubs, instead modify the stubs (use ConcurrentHashMap)
if (!stubs.isEmpty()) {
this.stubs = stubs;
}
} finally {
this.lock.writeLock().unlock();
}
}
}
}