Skip to content

Commit

Permalink
keepalive OpensergoClient
Browse files Browse the repository at this point in the history
  • Loading branch information
jnan806 committed Oct 13, 2022
1 parent 4270c11 commit 5afed50
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 30 deletions.
63 changes: 62 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,62 @@
# OpenSergo Java SDK
# OpenSergo Java SDK


## How to use

### scene 1 : subscribe config-data
``` java
public static void main(String[] args) throws Exception {

// instant OpenSergoClient
OpenSergoClient openSergoClient = new OpenSergoClient("33.1.33.1", 10246);

// registry SubscribeInfo of FaultToleranceRule
// 1. instant SubscribeKey
SubscribeKey subscribeKey = new SubscribeKey("default", "foo-app", ConfigKind.FAULT_TOLERANCE_RULE);
// 2. construct SubscribeInfo
OpensergoClientSubscribeInfo opensergoClientSubscribeInfo = new OpensergoClientSubscribeInfo(subscribeKey);
opensergoClientSubscribeInfo.addSubscriber(new DefaultFaulttoleranceSubscriber());
// 2. registry
openSergoClient.registerSubscribeInfo(opensergoClientSubscribeInfo);

// start OpenSergoClient
openSergoClient.start();
}
```

### scene 2 : subscribe config-data with custom-logic when config-data changed.
Add a subscriber by implements the function in `subscribe.Subscriber`.
``` java
public class TestSubscriber implements OpenSergoConfigSubscriber {
@Override
public boolean onConfigUpdate(SubscribeKey subscribeKey, Object data) {
System.out.println(data);
return true;
}
}
```
And then registry it to `subscriber.SubscriberRegistry`.
``` java
public static void main(String[] args) throws Exception {

// instant OpenSergoClient
OpenSergoClient openSergoClient = new OpenSergoClient("33.1.33.1", 10246);

// registry SubscribeInfo of FaultToleranceRule
// 1. instant SubscribeKey
SubscribeKey subscribeKey = new SubscribeKey("default", "foo-app", ConfigKind.FAULT_TOLERANCE_RULE);
// 2. instant Subscriber
TestSubscriber testSubscriber = new TestSubscriber();
// 3. construct SubscribeInfo
OpensergoClientSubscribeInfo opensergoClientSubscribeInfo = new OpensergoClientSubscribeInfo(subscribeKey);
opensergoClientSubscribeInfo.addSubscriber(testSubscriber);
// 4. registry
openSergoClient.registerSubscribeInfo(opensergoClientSubscribeInfo);

openSergoClient.start();

// registry after OpenSergoClient started
opensergoClientSubscribeInfo.addSubscriber(new OpensergoConfigDefaultSubscriber());
openSergoClient.registerSubscribeInfo(opensergoClientSubscribeInfo);
}
```
76 changes: 59 additions & 17 deletions src/main/java/io/opensergo/OpenSergoClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import io.opensergo.proto.transport.v1.SubscribeOpType;
import io.opensergo.proto.transport.v1.SubscribeRequest;
import io.opensergo.proto.transport.v1.SubscribeRequestTarget;
import io.opensergo.subscribe.OpenSergoConfigSubscriber;
import io.opensergo.subscribe.SubscribeKey;
import io.opensergo.subscribe.SubscribeRegistry;
import io.opensergo.subscribe.SubscribedConfigCache;
import io.opensergo.util.AssertUtils;
import io.opensergo.util.IdentifierUtils;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -46,6 +47,7 @@ public class OpenSergoClient implements AutoCloseable {
private final SubscribeRegistry subscribeRegistry;

private AtomicInteger reqId;
protected volatile OpensergoClientStatus status;

public OpenSergoClient(String host, int port) {
this.channel = ManagedChannelBuilder.forAddress(host, port)
Expand All @@ -56,17 +58,64 @@ public OpenSergoClient(String host, int port) {
this.configCache = new SubscribedConfigCache();
this.subscribeRegistry = new SubscribeRegistry();
this.reqId = new AtomicInteger(0);
status = OpensergoClientStatus.INITIAL;
}

public void registerSubscribeInfo(OpensergoClientSubscribeInfo subscribeInfo) {
// Register subscriber to local.
if (Optional.of(subscribeInfo.getSubscriberList()).isPresent() && subscribeInfo.getSubscriberList().size() > 0) {
subscribeInfo.getSubscriberList().forEach(subscriber -> {
this.subscribeRegistry.registerSubscriber(subscribeInfo.getSubscribeKey(), subscriber);
OpenSergoLogger.info("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}", subscribeInfo.getSubscribeKey(), subscriber);
});
}
}

public void start() throws Exception {
OpenSergoLogger.info("OpensergoClient is starting...");

if (status == OpensergoClientStatus.INITIAL) {
OpenSergoLogger.info("open keepavlive thread");
new Thread(this::keepAlive).start();
}

status = OpensergoClientStatus.STARTING;

this.requestAndResponseWriter = transportGrpcStub.withWaitForReady()
.subscribeConfig(new OpenSergoSubscribeClientObserver(configCache, subscribeRegistry));
.subscribeConfig(new OpenSergoSubscribeClientObserver(this));

OpenSergoLogger.info("begin to subscribe config-data...");
this.subscribeRegistry.getSubscriberKeysAll().forEach(subscribeKey -> {
this.subscribeConfig(subscribeKey);
});

OpenSergoLogger.info("openSergoClient is started");
status = OpensergoClientStatus.STARTED;
}

private void keepAlive() {
try {
if (status != OpensergoClientStatus.STARTING
&& status != OpensergoClientStatus.STARTED
&& status != OpensergoClientStatus.SHUTDOWN) {
OpenSergoLogger.info("try to restart openSergoClient...");
this.start();
}
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
if( status != OpensergoClientStatus.SHUTDOWN) {
keepAlive();
}
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void close() throws Exception {
requestAndResponseWriter.onCompleted();

status = OpensergoClientStatus.SHUTDOWN;

// gracefully drain the requests, then close the connection
channel.shutdown();
}
Expand All @@ -77,8 +126,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null");

if (requestAndResponseWriter == null) {
// TODO: return status that indicates not ready
throw new IllegalStateException("gRPC stream is not ready");
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready"));
status = OpensergoClientStatus.INTERRUPTED;
}
SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder()
.setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp())
Expand All @@ -97,17 +146,13 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
}

public boolean subscribeConfig(SubscribeKey subscribeKey) {
return subscribeConfig(subscribeKey, null);
}

public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscriber subscriber) {
AssertUtils.assertNotNull(subscribeKey, "subscribeKey cannot be null");
AssertUtils.assertNotNull(subscribeKey.getApp(), "app cannot be null");
AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null");

if (requestAndResponseWriter == null) {
// TODO: return status that indicates not ready
throw new IllegalStateException("gRPC stream is not ready");
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready"));
status = OpensergoClientStatus.INTERRUPTED;
}
SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder()
.setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp())
Expand All @@ -121,18 +166,15 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
// Send SubscribeRequest
requestAndResponseWriter.onNext(request);

// Register subscriber to local.
if (subscriber != null) {
subscribeRegistry.registerSubscriber(subscribeKey, subscriber);
OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}",
subscribeKey, subscriber);
}

return true;
}

public SubscribedConfigCache getConfigCache() {
return configCache;
}

public SubscribeRegistry getSubscribeRegistry() {
return subscribeRegistry;
}

}
22 changes: 11 additions & 11 deletions src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import io.opensergo.proto.transport.v1.SubscribeResponse;
import io.opensergo.subscribe.OpenSergoConfigSubscriber;
import io.opensergo.subscribe.SubscribeKey;
import io.opensergo.subscribe.SubscribeRegistry;
import io.opensergo.subscribe.SubscribedConfigCache;
import io.opensergo.util.StringUtils;

/**
Expand All @@ -41,13 +39,10 @@ public class OpenSergoSubscribeClientObserver implements ClientResponseObserver<

private ClientCallStreamObserver<SubscribeRequest> requestStream;

private final SubscribedConfigCache configCache;
private final SubscribeRegistry subscribeRegistry;
private OpenSergoClient openSergoClient;

public OpenSergoSubscribeClientObserver(SubscribedConfigCache configCache,
SubscribeRegistry subscribeRegistry) {
this.configCache = configCache;
this.subscribeRegistry = subscribeRegistry;
public OpenSergoSubscribeClientObserver(OpenSergoClient openSergoClient) {
this.openSergoClient = openSergoClient;
}

@Override
Expand All @@ -58,7 +53,7 @@ public void beforeStart(ClientCallStreamObserver<SubscribeRequest> requestStream
private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWithVersion dataWithVersion)
throws Exception {
long receivedVersion = dataWithVersion.getVersion();
SubscribedData cachedData = configCache.getDataFor(subscribeKey);
SubscribedData cachedData = this.openSergoClient.getConfigCache().getDataFor(subscribeKey);
if (cachedData != null && cachedData.getVersion() > receivedVersion) {
// The upcoming data is out-dated, so we'll not resolve the push request.
return new LocalDataNotifyResult().setCode(OpenSergoTransportConstants.CODE_ERROR_VERSION_OUTDATED);
Expand All @@ -67,9 +62,9 @@ private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWi
// Decode actual data from the raw "Any" data.
List<Object> dataList = decodeActualData(subscribeKey.getKind().getKindName(), dataWithVersion.getDataList());
// Update to local config cache.
configCache.updateData(subscribeKey, dataList, receivedVersion);
this.openSergoClient.getConfigCache().updateData(subscribeKey, dataList, receivedVersion);

List<OpenSergoConfigSubscriber> subscribers = subscribeRegistry.getSubscribersOf(subscribeKey);
List<OpenSergoConfigSubscriber> subscribers = this.openSergoClient.getSubscribeRegistry().getSubscribersOf(subscribeKey);
if (subscribers == null || subscribers.isEmpty()) {
// no-subscriber is acceptable (just for cache-and-pull mode)
return LocalDataNotifyResult.withSuccess(dataList);
Expand Down Expand Up @@ -178,6 +173,11 @@ private List<Object> decodeActualData(String kind, List<Any> rawList) throws Exc

@Override
public void onError(Throwable t) {
// TODO add handles for different io.grpc.Status of Throwable from ClientCallStreamObserver<SubscribeRequest>
io.grpc.Status.Code errorCode = io.grpc.Status.fromThrowable(t).getCode();
if(errorCode.equals(io.grpc.Status.UNAVAILABLE.getCode())) {
this.openSergoClient.status = OpensergoClientStatus.INTERRUPTED;
}
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", t);
}

Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/opensergo/OpensergoClientStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022, OpenSergo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opensergo;

/**
* @author Jiangnan Jia
**/
public enum OpensergoClientStatus {

/* initial*/
INITIAL,
STARTING,
STARTED,
INTERRUPTED,
SHUTDOWN

}
68 changes: 68 additions & 0 deletions src/main/java/io/opensergo/OpensergoClientSubscribeInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2022, OpenSergo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opensergo;

import com.google.common.collect.Lists;
import io.opensergo.subscribe.OpenSergoConfigSubscriber;
import io.opensergo.subscribe.SubscribeKey;

import java.util.List;
import java.util.stream.Collectors;

/**
* @author Jiangnan Jia
**/
public class OpensergoClientSubscribeInfo {

private SubscribeKey subscribeKey;
private List<OpenSergoConfigSubscriber> subscriberList;

public OpensergoClientSubscribeInfo(SubscribeKey subscribeKey) {
this.subscribeKey = subscribeKey;
this.subscriberList = Lists.newArrayList();
}
public OpensergoClientSubscribeInfo(SubscribeKey subscribeKey, List<OpenSergoConfigSubscriber> subscriberList) {
this.subscribeKey = subscribeKey;
this.subscriberList = subscriberList;
}

public OpensergoClientSubscribeInfo addSubscriber(OpenSergoConfigSubscriber subscriber) {
int sameClassNum = this.subscriberList.stream()
.filter(item -> item.getClass() == subscriber.getClass())
.collect(Collectors.toList())
.size();
if (sameClassNum == 0) {
this.subscriberList.add(subscriber);
}
return this;
}

public SubscribeKey getSubscribeKey() {
return subscribeKey;
}

public List<OpenSergoConfigSubscriber> getSubscriberList() {
return subscriberList;
}

@Override
public String toString() {
return "OpensergoClientSubscribeInfo{" +
"subscribeKey=" + subscribeKey +
", subscriberList=" + subscriberList +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022, OpenSergo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opensergo.subscribe;

/**
* @author Jiangnan Jia
**/
public class DefaultFaulttoleranceSubscriber implements OpenSergoConfigSubscriber {
@Override
public boolean onConfigUpdate(SubscribeKey subscribeKey, Object data) {
System.out.println(data);
return true;
}
}
Loading

0 comments on commit 5afed50

Please sign in to comment.