Skip to content

Commit

Permalink
Ericsson#699 Reload nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Chandler committed Oct 10, 2024
1 parent 231e492 commit 79c3b1c
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void close(final UUID nodeID) throws IOException
/**
* Creates a new connection a node
* @param node
*
*
* @throws IOException
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
package com.ericsson.bss.cassandra.ecchronos.application.spring;
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import com.datastax.oss.driver.api.core.metadata.Node;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ericsson.bss.cassandra.ecchronos.application.spring;

import com.datastax.oss.driver.api.core.metadata.Node;

import java.util.Comparator;
import java.util.UUID;


/***
* This will compare 2 nodes based on the Host id of the node
*/
public class NodeComparator implements Comparator<Node>
{
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ericsson.bss.cassandra.ecchronos.application.spring;

import com.datastax.oss.driver.api.core.metadata.Node;
Expand All @@ -10,13 +25,16 @@ public class NodeListComparator {

private static final Logger LOG = LoggerFactory.getLogger(NodeListComparator.class);



private List<NodeChangeRecord> changesList= new LinkedList<NodeChangeRecord>();
boolean complareNodeLists(List<Node> oldNodes, List<Node> newNodes )
/***
* Compares 2 lists of nodes, finds new nodes, removed nodes and nodes where the ip address has changed
* The lists are sorted before comparison, so lists can be in any order
* @param oldNodes
* @param newNodes
* @return a list of NodeChangeRecord items, zero items in the list indicate the 2 lists are the same.
*/
List<NodeChangeRecord> compareNodeLists(List<Node> oldNodes, List<Node> newNodes )
{
changesList.clear();
boolean isSame = true;
List<NodeChangeRecord> changesList= new LinkedList<NodeChangeRecord>();
NodeComparator nodeComparator = new NodeComparator();
oldNodes.sort(nodeComparator);
newNodes.sort(nodeComparator);
Expand All @@ -33,62 +51,54 @@ boolean complareNodeLists(List<Node> oldNodes, List<Node> newNodes )

if (newNode == null)
{
LOG.info("new node missing ");
LOG.info("Node has been removed, Node id: {} ", oldNode.getHostId());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE ));
isSame = false;
oldNode = getNode(oldIterator);
}
else {
if (oldNode.getHostId().equals(newNode.getHostId())) {
if (oldNode.getHostId().equals(newNode.getHostId()))
{
// same host id, now check the ipaddress is still the same
if (!oldNode.getListenAddress().equals(newNode.getListenAddress())) {
LOG.info("different ipaddresses ");
if (!oldNode.getListenAddress().equals(newNode.getListenAddress()))
{
LOG.info("Node id {}, has a different ipaddress, it was {}, it is now {} ", oldNode.getHostId(), oldNode.getListenAddress(), newNode.getListenAddress() );
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.UPDATE));
isSame = false;
} else
LOG.info("All the same ");
}
oldNode = getNode(oldIterator);
newNode = getNode(newIterator);
} else {
if (oldNode.getHostId().compareTo(newNode.getHostId()) == 1) {
LOG.info("New Node added");
}
else
{
if (oldNode.getHostId().compareTo(newNode.getHostId()) == 1)
{
LOG.info("Node has been added, Node id: {}", newNode.getHostId());
changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT));
newNode = getNode(newIterator);
} else {
LOG.info("Old Node removed");
}
else
{
LOG.info("Node has been removed, Node id: {}", oldNode.getHostId());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE));
oldNode = getNode(oldIterator);

}
isSame = false;

}
}



}
while ( newNode != null) {

changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT ));
LOG.info("Extra node added");
isSame=false;
LOG.info("Node has been added, Node id: {}", newNode.getHostId());
newNode = getNode(newIterator);
}


return isSame;
return changesList;
}

private Node getNode(Iterator<Node> oldIterator) {
Node oldNode;
if ( oldIterator.hasNext())
oldNode = oldIterator.next();
private Node getNode(Iterator<Node> iterator) {
Node node;
if ( iterator.hasNext())
node = iterator.next();
else
oldNode = null;
return oldNode;
}
public List<NodeChangeRecord> getChangesList() {
return changesList;
node = null;
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
package com.ericsson.bss.cassandra.ecchronos.application.spring;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.application.config.Config;
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.data.enums.NodeStatus;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.PostConstruct;
Expand All @@ -34,7 +30,6 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Service;

import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -103,9 +98,10 @@ void reloadNodes() {
List<Node> oldNodes = myDistributedNativeConnectionProvider.getNodes();
List<Node> newNodes = myDistributedNativeConnectionProvider.reloadNodes();
CqlSession cqlSession = myDistributedNativeConnectionProvider.getCqlSession();
if (!nodeListComparator.complareNodeLists(oldNodes,newNodes)){
List<NodeChangeRecord> nodeChangeList = nodeListComparator.compareNodeLists(oldNodes,newNodes);
if (nodeChangeList.size() > 0){
myDistributedNativeConnectionProvider.setNodes(newNodes);
Iterator<NodeChangeRecord> iterator = nodeListComparator.getChangesList().iterator();
Iterator<NodeChangeRecord> iterator = nodeChangeList.iterator();
while (iterator.hasNext()){
NodeChangeRecord nodeChangeRecord = iterator.next();
if ( nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.INSERT){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import java.util.Optional;
import java.util.UUID;

/***
* Dummy node used for testing, only endPoint, listenAddress and hostId are set.
*/

public class DummyNode implements Node {

EndPoint endPoint;
Expand All @@ -22,7 +26,7 @@ public DummyNode(EndPoint endPoint, InetSocketAddress listenAddress, UUID hostId
this.listenAddress = listenAddress;
this.hostId = hostId;
}
@Override
@Override
public EndPoint getEndPoint() {
return endPoint;
}
Expand Down
Loading

0 comments on commit 79c3b1c

Please sign in to comment.