Skip to content

Commit

Permalink
IGNITE-23406 Change semantic of cluster init REST call
Browse files Browse the repository at this point in the history
  • Loading branch information
dant3 committed Oct 23, 2024
1 parent b37b538 commit 6a89634
Show file tree
Hide file tree
Showing 18 changed files with 640 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.ignite.internal.cluster.management;

import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.util.Collection;
import java.util.List;
Expand All @@ -36,11 +38,13 @@
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.cluster.management.network.messages.InitCompleteMessage;
import org.apache.ignite.internal.cluster.management.network.messages.InitErrorMessage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.configuration.validation.ConfigurationValidator;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.StringUtils;
import org.apache.ignite.network.ClusterNode;
Expand All @@ -56,6 +60,8 @@ public class ClusterInitializer {

private final ClusterService clusterService;

private final LogicalTopology logicalTopology;

private final ConfigurationDynamicDefaultsPatcher configurationDynamicDefaultsPatcher;

private final ConfigurationValidator clusterConfigurationValidator;
Expand All @@ -65,10 +71,12 @@ public class ClusterInitializer {
/** Constructor. */
public ClusterInitializer(
ClusterService clusterService,
LogicalTopology logicalTopology,
ConfigurationDynamicDefaultsPatcher configurationDynamicDefaultsPatcher,
ConfigurationValidator clusterConfigurationValidator
) {
this.clusterService = clusterService;
this.logicalTopology = logicalTopology;
this.configurationDynamicDefaultsPatcher = configurationDynamicDefaultsPatcher;
this.clusterConfigurationValidator = clusterConfigurationValidator;
}
Expand Down Expand Up @@ -182,6 +190,10 @@ public CompletableFuture<Void> initCluster(
}
}

private CompletableFuture<Void> awaitCorrectClusterTopology() {
return new ClusterNodesJoinAwaiter(clusterService, logicalTopology);
}

private CompletableFuture<Void> cancelInit(Collection<ClusterNode> nodes, Throwable e) {
CancelInitMessage cancelMessage = msgFactory.cancelInitMessage()
.reason(e.getMessage())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.cluster.management;

import static org.apache.ignite.internal.util.subscription.Publishers.combineLatest;
import static org.apache.ignite.internal.util.subscription.Publishers.filter;
import static org.apache.ignite.internal.util.subscription.Publishers.firstItem;

import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscription;
import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.network.ClusterNode;

public class ClusterNodesJoinAwaiter {
private final ClusterService clusterService;
private final LogicalTopology logicalTopology;

public ClusterNodesJoinAwaiter(ClusterService clusterService, LogicalTopology logicalTopology) {
this.clusterService = clusterService;
this.logicalTopology = logicalTopology;
}

public CompletableFuture<Void> awaitAllNodesToJoin() {
Publisher<Collection<ClusterNode>> physicalTopologyPublisher = physicalTopologyPublisher(clusterService.topologyService());
Publisher<LogicalTopologySnapshot> logicalTopologyPublisher = logicalTopologyPublisher(logicalTopology);

Publisher<Boolean> allNodesJoinedCluster =
combineLatest(physicalTopologyPublisher, logicalTopologyPublisher, (physical, logical) -> {
Set<UUID> physicalNodesSet = physical.stream().map(ClusterNode::id).collect(Collectors.toSet());
Set<UUID> logicalNodesSet = logical.nodes().stream().map(ClusterNode::id).collect(Collectors.toSet());

physicalNodesSet.removeAll(logicalNodesSet);

return physicalNodesSet.isEmpty();
});

return firstItem(filter(allNodesJoinedCluster, it -> it == Boolean.TRUE))
.thenApply(unused -> null);
}

private static Publisher<Collection<ClusterNode>> physicalTopologyPublisher(TopologyService topologyService) {
return subscriber -> {
TopologyEventHandler eventHandler = new TopologyEventHandler() {
@Override
public void onAppeared(ClusterNode member) {
subscriber.onNext(topologyService.allMembers());
}

@Override
public void onDisappeared(ClusterNode member) {
subscriber.onNext(topologyService.allMembers());
}
};

Subscription subscription = new Subscription() {
@Override
public void request(long n) {
// noop backpressure: our subscription is always hot, because it's based on a listener push semantics.
}

@Override
public void cancel() {
topologyService.removeEventHandler(eventHandler);
}
};


subscriber.onSubscribe(subscription);

topologyService.addEventHandler(eventHandler);

subscriber.onNext(topologyService.allMembers());
};
}

private static Publisher<LogicalTopologySnapshot> logicalTopologyPublisher(LogicalTopology logicalTopology) {
return subscriber -> {
LogicalTopologyEventListener eventListener = new LogicalTopologyEventListener() {
@Override
public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) {
subscriber.onNext(newTopology);
}

@Override
public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
subscriber.onNext(newTopology);
}

@Override
public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
subscriber.onNext(newTopology);
}
};

Subscription subscription = new Subscription() {
@Override
public void request(long n) {
// noop: our subscription is always hot, because it's based on a listener push semantics
}

@Override
public void cancel() {
logicalTopology.removeEventListener(eventListener);
}
};

subscriber.onSubscribe(subscription);
logicalTopology.addEventListener(eventListener);
subscriber.onNext(logicalTopology.getLogicalTopology());
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.internal.cluster.management.network.messages.CancelInitMessage;
import org.apache.ignite.internal.cluster.management.network.messages.CmgInitMessage;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
Expand Down Expand Up @@ -73,12 +74,13 @@ public class ClusterInitializerTest extends BaseIgniteAbstractTest {
private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();

@BeforeEach
void setUp(@Mock ClusterService clusterService) {
void setUp(@Mock ClusterService clusterService, @Mock LogicalTopology logicalTopology) {
when(clusterService.messagingService()).thenReturn(messagingService);
when(clusterService.topologyService()).thenReturn(topologyService);

clusterInitializer = new ClusterInitializer(
clusterService,
logicalTopology,
hocon -> hocon,
new TestConfigurationValidator()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
Expand Down Expand Up @@ -125,14 +126,15 @@ public MockNode(
RaftGroupOptionsConfigurer cmgRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(cmgLogStorageFactory, this.workDir.resolve("cmg/meta"));

LogicalTopology logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
this.clusterManager = new ClusterManagementGroupManager(
vaultManager,
new SystemDisasterRecoveryStorage(vaultManager),
clusterService,
new ClusterInitializer(clusterService, hocon -> hocon, new TestConfigurationValidator()),
new ClusterInitializer(clusterService, logicalTopology, hocon -> hocon, new TestConfigurationValidator()),
raftManager,
clusterStateStorage,
new LogicalTopologyImpl(clusterStateStorage),
logicalTopology,
new NodeAttributesCollector(nodeAttributes, storageProfilesConfiguration),
failureManager,
clusterIdHolder,
Expand Down
Loading

0 comments on commit 6a89634

Please sign in to comment.