From 11f1534a8a8ec1d48d809719c13aa06f5beee160 Mon Sep 17 00:00:00 2001 From: wallezhang Date: Fri, 14 Oct 2022 16:01:30 +0800 Subject: [PATCH] Refactor: Trigger the listener added for the first time (#225) * refactor: Trigger the listener added for the first time * refactor: Trigger the watch listener before adding to watchers --- .../polaris/discovery/client/flow/WatchFlow.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/WatchFlow.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/WatchFlow.java index 567cbd2ac..20bdf1e00 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/WatchFlow.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/WatchFlow.java @@ -36,8 +36,10 @@ import com.tencent.polaris.client.util.NamedThreadFactory; import com.tencent.polaris.client.util.Utils; import com.tencent.polaris.logging.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -45,6 +47,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import org.slf4j.Logger; /** @@ -77,7 +80,18 @@ public WatchServiceResponse commonWatchService(CommonWatchServiceRequest request InstancesResponse response = syncFlow.commonSyncGetAllInstances(request.getAllRequest()); watchers.computeIfAbsent(request.getSvcEventKey().getServiceKey(), key -> Collections.synchronizedSet(new HashSet<>())); - boolean result = watchers.get(serviceKey).addAll(request.getWatchServiceRequest().getListeners()); + List addListeners = request.getWatchServiceRequest().getListeners(); + Set existListeners = watchers.get(serviceKey); + List firstAddedListeners = addListeners.stream() + .filter(serviceListener -> !existListeners.contains(serviceListener)).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(firstAddedListeners)) { + ServiceChangeEvent event = ServiceChangeEvent.builder().serviceKey(serviceKey) + .addInstances(Arrays.asList(response.getInstances())) + .allInstances(Arrays.asList(response.getInstances())).build(); + firstAddedListeners.forEach( + serviceListener -> executor.execute(event.getServiceKey(), () -> serviceListener.onEvent(event))); + } + boolean result = existListeners.addAll(addListeners); return new WatchServiceResponse(response, result); }