From c9d48a2a332c445536e1403de622adef3a7ba47d Mon Sep 17 00:00:00 2001 From: jnan806 Date: Mon, 10 Oct 2022 17:34:35 +0800 Subject: [PATCH] keepalive OpensergoClient --- README.md | 63 ++++++++++++++++- .../java/io/opensergo/OpenSergoClient.java | 70 ++++++++++++++----- .../OpenSergoSubscribeClientObserver.java | 5 ++ .../io/opensergo/OpensergoClientStatus.java | 30 ++++++++ .../OpensergoClientSubscribeInfo.java | 68 ++++++++++++++++++ .../DefaultFaulttoleranceSubscriber.java | 27 +++++++ .../OpensergoConfigDefaultSubscriber.java | 29 ++++++++ .../subscribe/SubscribeRegistry.java | 16 ++++- 8 files changed, 290 insertions(+), 18 deletions(-) create mode 100644 src/main/java/io/opensergo/OpensergoClientStatus.java create mode 100644 src/main/java/io/opensergo/OpensergoClientSubscribeInfo.java create mode 100644 src/main/java/io/opensergo/subscribe/DefaultFaulttoleranceSubscriber.java create mode 100644 src/main/java/io/opensergo/subscribe/OpensergoConfigDefaultSubscriber.java diff --git a/README.md b/README.md index a2a6552..580cf2d 100644 --- a/README.md +++ b/README.md @@ -1 +1,62 @@ -# OpenSergo Java SDK \ No newline at end of file +# OpenSergo Java SDK + + +## How to use + +### scene 1 : subscribe config-data +``` java +public static void main(String[] args) throws Exception { + + // instant OpenSergoClient + OpenSergoClient openSergoClient = new OpenSergoClient("33.1.33.1", 10246); + + // registry SubscribeInfo of FaultToleranceRule + // 1. instant SubscribeKey + SubscribeKey subscribeKey = new SubscribeKey("default", "foo-app", ConfigKind.FAULT_TOLERANCE_RULE); + // 2. construct SubscribeInfo + OpensergoClientSubscribeInfo opensergoClientSubscribeInfo = new OpensergoClientSubscribeInfo(subscribeKey); + opensergoClientSubscribeInfo.addSubscriber(new DefaultFaulttoleranceSubscriber()); + // 2. registry + openSergoClient.registerSubscribeInfo(opensergoClientSubscribeInfo); + + // start OpenSergoClient + openSergoClient.start(); +} +``` + +### scene 2 : subscribe config-data with custom-logic when config-data changed. +Add a subscriber by implements the function in `subscribe.Subscriber`. +``` java +public class TestSubscriber implements OpenSergoConfigSubscriber { + @Override + public boolean onConfigUpdate(SubscribeKey subscribeKey, Object data) { + System.out.println(data); + return true; + } +} +``` +And then registry it to `subscriber.SubscriberRegistry`. +``` java +public static void main(String[] args) throws Exception { + + // instant OpenSergoClient + OpenSergoClient openSergoClient = new OpenSergoClient("33.1.33.1", 10246); + + // registry SubscribeInfo of FaultToleranceRule + // 1. instant SubscribeKey + SubscribeKey subscribeKey = new SubscribeKey("default", "foo-app", ConfigKind.FAULT_TOLERANCE_RULE); + // 2. instant Subscriber + TestSubscriber testSubscriber = new TestSubscriber(); + // 3. construct SubscribeInfo + OpensergoClientSubscribeInfo opensergoClientSubscribeInfo = new OpensergoClientSubscribeInfo(subscribeKey); + opensergoClientSubscribeInfo.addSubscriber(testSubscriber); + // 4. registry + openSergoClient.registerSubscribeInfo(opensergoClientSubscribeInfo); + + openSergoClient.start(); + + // registry after OpenSergoClient started + opensergoClientSubscribeInfo.addSubscriber(new OpensergoConfigDefaultSubscriber()); + openSergoClient.registerSubscribeInfo(opensergoClientSubscribeInfo); +} +``` \ No newline at end of file diff --git a/src/main/java/io/opensergo/OpenSergoClient.java b/src/main/java/io/opensergo/OpenSergoClient.java index 0f3de80..7bfad71 100644 --- a/src/main/java/io/opensergo/OpenSergoClient.java +++ b/src/main/java/io/opensergo/OpenSergoClient.java @@ -23,13 +23,14 @@ import io.opensergo.proto.transport.v1.SubscribeOpType; import io.opensergo.proto.transport.v1.SubscribeRequest; import io.opensergo.proto.transport.v1.SubscribeRequestTarget; -import io.opensergo.subscribe.OpenSergoConfigSubscriber; import io.opensergo.subscribe.SubscribeKey; import io.opensergo.subscribe.SubscribeRegistry; import io.opensergo.subscribe.SubscribedConfigCache; import io.opensergo.util.AssertUtils; import io.opensergo.util.IdentifierUtils; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -46,6 +47,7 @@ public class OpenSergoClient implements AutoCloseable { private final SubscribeRegistry subscribeRegistry; private AtomicInteger reqId; + protected static volatile OpensergoClientStatus status; public OpenSergoClient(String host, int port) { this.channel = ManagedChannelBuilder.forAddress(host, port) @@ -56,17 +58,64 @@ public OpenSergoClient(String host, int port) { this.configCache = new SubscribedConfigCache(); this.subscribeRegistry = new SubscribeRegistry(); this.reqId = new AtomicInteger(0); + status = OpensergoClientStatus.INITIAL; + } + + public void registerSubscribeInfo(OpensergoClientSubscribeInfo subscribeInfo) { + // Register subscriber to local. + if (Optional.of(subscribeInfo.getSubscriberList()).isPresent() && subscribeInfo.getSubscriberList().size() > 0) { + subscribeInfo.getSubscriberList().forEach(subscriber -> { + this.subscribeRegistry.registerSubscriber(subscribeInfo.getSubscribeKey(), subscriber); + OpenSergoLogger.info("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}", subscribeInfo.getSubscribeKey(), subscriber); + }); + } } public void start() throws Exception { + OpenSergoLogger.info("OpensergoClient is starting..."); + + if (status == OpensergoClientStatus.INITIAL) { + OpenSergoLogger.info("open keepavlive thread"); + new Thread(this::keepAlive).start(); + } + + status = OpensergoClientStatus.STARTING; + this.requestAndResponseWriter = transportGrpcStub.withWaitForReady() .subscribeConfig(new OpenSergoSubscribeClientObserver(configCache, subscribeRegistry)); + + OpenSergoLogger.info("begin to subscribe config-data..."); + this.subscribeRegistry.getSubscriberKeysAll().forEach(subscribeKey -> { + this.subscribeConfig(subscribeKey); + }); + + OpenSergoLogger.info("openSergoClient is started"); + status = OpensergoClientStatus.STARTED; + } + + private void keepAlive() { + try { + if (status != OpensergoClientStatus.STARTING + && status != OpensergoClientStatus.STARTED + && status != OpensergoClientStatus.SHUTDOWN) { + OpenSergoLogger.info("try to restart openSergoClient..."); + this.start(); + } + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + if( status != OpensergoClientStatus.SHUTDOWN) { + keepAlive(); + } + } catch (Exception e) { + e.printStackTrace(); + } } @Override public void close() throws Exception { requestAndResponseWriter.onCompleted(); + status = OpensergoClientStatus.SHUTDOWN; + // gracefully drain the requests, then close the connection channel.shutdown(); } @@ -77,8 +126,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) { AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null"); if (requestAndResponseWriter == null) { - // TODO: return status that indicates not ready - throw new IllegalStateException("gRPC stream is not ready"); + OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready")); + status = OpensergoClientStatus.INTERRUPTED; } SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder() .setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp()) @@ -97,17 +146,13 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) { } public boolean subscribeConfig(SubscribeKey subscribeKey) { - return subscribeConfig(subscribeKey, null); - } - - public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscriber subscriber) { AssertUtils.assertNotNull(subscribeKey, "subscribeKey cannot be null"); AssertUtils.assertNotNull(subscribeKey.getApp(), "app cannot be null"); AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null"); if (requestAndResponseWriter == null) { - // TODO: return status that indicates not ready - throw new IllegalStateException("gRPC stream is not ready"); + OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready")); + status = OpensergoClientStatus.INTERRUPTED; } SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder() .setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp()) @@ -121,13 +166,6 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri // Send SubscribeRequest requestAndResponseWriter.onNext(request); - // Register subscriber to local. - if (subscriber != null) { - subscribeRegistry.registerSubscriber(subscribeKey, subscriber); - OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}", - subscribeKey, subscriber); - } - return true; } diff --git a/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java b/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java index 1ea1b8d..ea59503 100644 --- a/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java +++ b/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java @@ -178,6 +178,11 @@ private List decodeActualData(String kind, List rawList) throws Exc @Override public void onError(Throwable t) { + // TODO add handles for different io.grpc.Status of Throwable from ClientCallStreamObserver + io.grpc.Status.Code errorCode = io.grpc.Status.fromThrowable(t).getCode(); + if(errorCode.equals(io.grpc.Status.UNAVAILABLE.getCode())) { + OpenSergoClient.status = OpensergoClientStatus.INTERRUPTED; + } OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", t); } diff --git a/src/main/java/io/opensergo/OpensergoClientStatus.java b/src/main/java/io/opensergo/OpensergoClientStatus.java new file mode 100644 index 0000000..99ac2e5 --- /dev/null +++ b/src/main/java/io/opensergo/OpensergoClientStatus.java @@ -0,0 +1,30 @@ +/* + * Copyright 2022, OpenSergo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.opensergo; + +/** + * @author Jiangnan Jia + **/ +public enum OpensergoClientStatus { + + /* initial*/ + INITIAL, + STARTING, + STARTED, + INTERRUPTED, + SHUTDOWN + +} diff --git a/src/main/java/io/opensergo/OpensergoClientSubscribeInfo.java b/src/main/java/io/opensergo/OpensergoClientSubscribeInfo.java new file mode 100644 index 0000000..99f1825 --- /dev/null +++ b/src/main/java/io/opensergo/OpensergoClientSubscribeInfo.java @@ -0,0 +1,68 @@ +/* + * Copyright 2022, OpenSergo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.opensergo; + +import com.google.common.collect.Lists; +import io.opensergo.subscribe.OpenSergoConfigSubscriber; +import io.opensergo.subscribe.SubscribeKey; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author Jiangnan Jia + **/ +public class OpensergoClientSubscribeInfo { + + private SubscribeKey subscribeKey; + private List subscriberList; + + public OpensergoClientSubscribeInfo(SubscribeKey subscribeKey) { + this.subscribeKey = subscribeKey; + this.subscriberList = Lists.newArrayList(); + } + public OpensergoClientSubscribeInfo(SubscribeKey subscribeKey, List subscriberList) { + this.subscribeKey = subscribeKey; + this.subscriberList = subscriberList; + } + + public OpensergoClientSubscribeInfo addSubscriber(OpenSergoConfigSubscriber subscriber) { + int sameClassNum = this.subscriberList.stream() + .filter(item -> item.getClass() == subscriber.getClass()) + .collect(Collectors.toList()) + .size(); + if (sameClassNum == 0) { + this.subscriberList.add(subscriber); + } + return this; + } + + public SubscribeKey getSubscribeKey() { + return subscribeKey; + } + + public List getSubscriberList() { + return subscriberList; + } + + @Override + public String toString() { + return "OpensergoClientSubscribeInfo{" + + "subscribeKey=" + subscribeKey + + ", subscriberList=" + subscriberList + + '}'; + } +} diff --git a/src/main/java/io/opensergo/subscribe/DefaultFaulttoleranceSubscriber.java b/src/main/java/io/opensergo/subscribe/DefaultFaulttoleranceSubscriber.java new file mode 100644 index 0000000..718455b --- /dev/null +++ b/src/main/java/io/opensergo/subscribe/DefaultFaulttoleranceSubscriber.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022, OpenSergo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.opensergo.subscribe; + +/** + * @author Jiangnan Jia + **/ +public class DefaultFaulttoleranceSubscriber implements OpenSergoConfigSubscriber { + @Override + public boolean onConfigUpdate(SubscribeKey subscribeKey, Object data) { + System.out.println(data); + return true; + } +} diff --git a/src/main/java/io/opensergo/subscribe/OpensergoConfigDefaultSubscriber.java b/src/main/java/io/opensergo/subscribe/OpensergoConfigDefaultSubscriber.java new file mode 100644 index 0000000..5f5645b --- /dev/null +++ b/src/main/java/io/opensergo/subscribe/OpensergoConfigDefaultSubscriber.java @@ -0,0 +1,29 @@ +/* + * Copyright 2022, OpenSergo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.opensergo.subscribe; + +import io.opensergo.log.OpenSergoLogger; + +/** + * @author Jiangnan Jia + **/ +public class OpensergoConfigDefaultSubscriber implements OpenSergoConfigSubscriber { + @Override + public boolean onConfigUpdate(SubscribeKey subscribeKey, Object data) { + OpenSergoLogger.info(data.toString()); + return true; + } +} diff --git a/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java b/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java index ed66be0..9cbebf9 100644 --- a/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java +++ b/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java @@ -16,9 +16,11 @@ package io.opensergo.subscribe; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import io.opensergo.util.AssertUtils; @@ -33,7 +35,19 @@ public void registerSubscriber(SubscribeKey key, OpenSergoConfigSubscriber subsc AssertUtils.assertNotNull(key, "subscribeKey cannot be null"); AssertUtils.assertNotNull(subscriber, "subscriber cannot be null"); List list = subscriberMap.computeIfAbsent(key, v -> new CopyOnWriteArrayList<>()); - list.add(subscriber); + + // distinct the same OpenSergoConfigSubscriber + int sameClassNum = list.stream() + .filter(item -> item.getClass() == subscriber.getClass()) + .collect(Collectors.toList()) + .size(); + if (sameClassNum == 0) { + list.add(subscriber); + } + } + + public Set getSubscriberKeysAll() { + return subscriberMap.keySet(); } public List getSubscribersOf(SubscribeKey key) {