Skip to content

Commit

Permalink
Major refactored and bug fixes. Fast-bully, Gossip FD and Consensus w…
Browse files Browse the repository at this point in the history
…orks well and more robust now.
  • Loading branch information
victorskl committed May 17, 2017
1 parent 0ea71e2 commit 1999d26
Show file tree
Hide file tree
Showing 36 changed files with 671 additions and 470 deletions.
4 changes: 2 additions & 2 deletions config/system.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ failure.detector.gossip=true
## in second
#
# time interval for consensus job
consensus.interval=5
consensus.interval=10
# consensus duration for collecting votes, no bigger than consensus.interval
consensus.vote.duration=4
consensus.vote.duration=5

####
## Timing setting for Leader Election
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public enum Protocol {
movejoin, serverchange, listserver, serverlist, servers, address,
authenticate, username, password, rememberme, authresponse, success, reason,
sessionid, notifyusersession, status, alive, managementport, serverup, notifyserverdown,
timestamp, gossip, heartbeatcountlist, startvote, answervote, vote, suspectserverid,
timestamp, gossip, heartbeatcountlist, startvote, answervote, vote, votedby, suspectserverid,
startelection, answerelection, coordinator, iamup, viewelection, nominationelection,
currentcoordinatorid, currentcoordinatoraddress, currentcoordinatorport, currentcoordinatormanagementport
}
108 changes: 64 additions & 44 deletions strike-server/src/main/java/strike/StrikeServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import strike.common.model.Protocol;
import strike.common.model.ServerInfo;
import strike.heartbeat.AliveJob;
import strike.heartbeat.ConsensusJob;
import strike.heartbeat.GossipJob;
import strike.model.Constant;
import strike.model.LocalChatRoomInfo;
import strike.model.RemoteChatRoomInfo;
import strike.service.*;
Expand All @@ -39,6 +39,7 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -102,7 +103,26 @@ public StrikeServer(String[] args) {

updateLogger();

// POST
serverState.setIsFastBully(systemProperties.getBoolean("election.fast.bully"));
// T2
serverState.setElectionAnswerTimeout(systemProperties.getLong("election.answer.timeout"));
// T3
serverState.setElectionCoordinatorTimeout(systemProperties.getLong("election.coordinator.timeout"));
// T4
serverState.setElectionNominationTimeout(systemProperties.getLong("election.nomination.timeout"));

serverState.setupConnectedServers();


// ********************
//
// POST Boot
//
// It is safer to set initial value of ServerState before this line, if any.
// Below this line, comm sub-system will be booted using of at this point ServerState.
//
// ********************


mainHall = "MainHall-" + serverInfo.getServerId();
LocalChatRoomInfo localChatRoomInfo = new LocalChatRoomInfo();
Expand All @@ -115,14 +135,14 @@ public StrikeServer(String[] args) {
//addMainHallsStatically();
syncChatRooms();

serverState.setupConnectedServers();
readElectionTimeoutConfigurations();
initiateCoordinator();

if (systemProperties.getBoolean("failure.detector.gossip")) {
logger.info("Failure Detection is running GOSSIP mode");
startGossip();
startConsensus();
} else {
logger.info("Failure Detection is running NAIVE mode");
startHeartBeat();
}

Expand All @@ -134,43 +154,35 @@ public StrikeServer(String[] args) {
}
}

private void readElectionTimeoutConfigurations() {
serverState.setIsFastBully(systemProperties.getBoolean("election.fast.bully"));
// T2
serverState.setElectionAnswerTimeout(systemProperties.getLong("election.answer.timeout"));
// T3
serverState.setElectionCoordinatorTimeout(systemProperties.getLong("election.coordinator.timeout"));
// T4
serverState.setElectionNominationTimeout(systemProperties.getLong("election.nomination.timeout"));
}

private void initiateCoordinator() {
logger.debug("Starting initial coordinator election...");
if (serverState.getServerInfoList().size() == 1) {
// if there's only one server then no need of an election
serverState.setCoordinator(serverInfo);
} else {
if (serverState.getIsFastBully()) {

logger.info("Leader Election is running in FAST-BULLY mode");

new FastBullyElectionManagementService().sendIamUpMessage(serverState.getServerInfo(),
serverState.getServerInfoList());
try {
serverState.setViewMessageReceived(false);
new FastBullyElectionManagementService().startWaitingForViewMessage(StdSchedulerFactory
.getDefaultScheduler(), serverState.getElectionAnswerTimeout());
//serverState.setViewMessageReceived(false);
new FastBullyElectionManagementService().startWaitingForViewMessage(serverState.getElectionAnswerTimeout());
} catch (SchedulerException e) {
logger.error("Error while waiting for the view message at fast bully election: " +
e.getLocalizedMessage());
}
} else {
try {
new BullyElectionManagementService()
.startElection(serverState.getServerInfo(), serverState.getCandidateServerInfoList(),
serverState.getElectionAnswerTimeout());
new BullyElectionManagementService().startWaitingForAnswerMessage(serverState.getServerInfo(),
StdSchedulerFactory.getDefaultScheduler(), serverState.getElectionAnswerTimeout());
} catch (SchedulerException e) {
logger.error("Unable to start the default bully election : " + e.getLocalizedMessage());
}

logger.info("Leader Election is running in DEFAULT BULLY mode");

new BullyElectionManagementService()
.startElection(serverState.getServerInfo(), serverState.getCandidateServerInfoList(),
serverState.getElectionAnswerTimeout());

new BullyElectionManagementService().startWaitingForAnswerMessage(serverState.getServerInfo(),
serverState.getElectionAnswerTimeout());
}
}
}
Expand All @@ -179,20 +191,19 @@ private void startConsensus() {
try {

JobDetail consensusJob = JobBuilder.newJob(ConsensusJob.class)
.withIdentity("ConsensusJob", "group1").build();
.withIdentity(Constant.CONSENSUS_JOB, "group1").build();

consensusJob.getJobDataMap().put("consensusVoteDuration", systemProperties.getInt("consensus.vote.duration"));


Trigger consensusTrigger = TriggerBuilder
.newTrigger()
.withIdentity("consensusJobTrigger", "group1")
.withIdentity(Constant.CONSENSUS_JOB_TRIGGER, "group1")
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(systemProperties.getInt("consensus.interval")).repeatForever())
.build();

Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
Scheduler scheduler = Quartz.getInstance().getScheduler();

scheduler.start();
scheduler.scheduleJob(consensusJob, consensusTrigger);
Expand All @@ -206,19 +217,19 @@ private void startGossip() {
try {

JobDetail gossipJob = JobBuilder.newJob(GossipJob.class)
.withIdentity("GossipJob", "group1").build();
.withIdentity(Constant.GOSSIP_JOB, "group1").build();

gossipJob.getJobDataMap().put("aliveErrorFactor", systemProperties.getInt("alive.error.factor"));

Trigger gossipTrigger = TriggerBuilder
.newTrigger()
.withIdentity("aliveJobTrigger", "group1")
.withIdentity(Constant.GOSSIP_JOB_TRIGGER, "group1")
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(systemProperties.getInt("alive.interval")).repeatForever())
.build();

Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
Scheduler scheduler = Quartz.getInstance().getScheduler();
scheduler.start();
scheduler.scheduleJob(gossipJob, gossipTrigger);

Expand All @@ -231,19 +242,19 @@ private void startHeartBeat() {
try {

JobDetail aliveJob = JobBuilder.newJob(AliveJob.class)
.withIdentity("AliveJob", "group1").build();
.withIdentity(Constant.ALIVE_JOB, "group1").build();

aliveJob.getJobDataMap().put("aliveErrorFactor", systemProperties.getInt("alive.error.factor"));

Trigger aliveTrigger = TriggerBuilder
.newTrigger()
.withIdentity("AliveJobTrigger", "group1")
.withIdentity(Constant.ALIVE_JOB_TRIGGER, "group1")
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(systemProperties.getInt("alive.interval")).repeatForever())
.build();

Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
Scheduler scheduler = Quartz.getInstance().getScheduler();
scheduler.start();
scheduler.scheduleJob(aliveJob, aliveTrigger);

Expand All @@ -267,17 +278,26 @@ private void readServerConfiguration() {
private void updateLogger() {
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
org.apache.logging.log4j.core.config.Configuration config = ctx.getConfiguration();
LoggerConfig loggerConfig = config.getLoggerConfig("strike");

if (debug && !trace) {
loggerConfig.setLevel(Level.DEBUG);
ctx.updateLoggers();
logger.debug("Server is running in DEBUG mode");
for (Map.Entry entry : config.getLoggers().entrySet()) {
String name = (String) entry.getKey();
if (name.startsWith("strike")) {
LoggerConfig loggerConfig = (LoggerConfig) entry.getValue();
if (debug && !trace) {
loggerConfig.setLevel(Level.DEBUG);
ctx.updateLoggers();
}

if (trace) {
loggerConfig.setLevel(Level.TRACE);
ctx.updateLoggers();
}
}
}

if (trace) {
loggerConfig.setLevel(Level.TRACE);
ctx.updateLoggers();
if (debug && !trace) {
logger.debug("Server is running in DEBUG mode");
} else {
logger.trace("Server is running in TRACE mode");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import strike.service.ServerState;

public class ProtocolHandlerFactory {
private final static ServerState serverState = ServerState.getInstance();

/**
* @deprecated use newClientHandler() or newManagementHandler()
*/
Expand Down Expand Up @@ -46,8 +46,8 @@ public static IProtocolHandler newClientHandler(JSONObject jsonMessage, Runnable
}

// Added 16/20/16 by Ray
if (type.equalsIgnoreCase(Protocol.listserver.toString())){
return new ListServerProtocolHandler(jsonMessage,connection);
if (type.equalsIgnoreCase(Protocol.listserver.toString())) {
return new ListServerProtocolHandler(jsonMessage, connection);
}

// will check authentication inside handler itself
Expand Down Expand Up @@ -144,52 +144,51 @@ public static IProtocolHandler newManagementHandler(JSONObject jsonMessage, Runn
return new NotifyServerDownProtocolHandler(jsonMessage, connection);
}

if (type.equalsIgnoreCase(Protocol.startelection.toString())){
if (serverState.getIsFastBully()) {
if (type.equalsIgnoreCase(Protocol.startelection.toString())) {
if (ServerState.getInstance().getIsFastBully()) {
return new FastBullyStartElectionMessageHandler(jsonMessage, connection);
}
return new StartElectionMessageHandler(jsonMessage, connection);
}

if (type.equalsIgnoreCase(Protocol.answerelection.toString())) {
if (serverState.getIsFastBully()) {
if (ServerState.getInstance().getIsFastBully()) {
return new FastBullyAnswerElectionMessageHandler(jsonMessage, connection);
}
return new AnswerElectionMessageHandler(jsonMessage, connection);
}

if (type.equalsIgnoreCase(Protocol.coordinator.toString())){
if (serverState.getIsFastBully()) {
if (type.equalsIgnoreCase(Protocol.coordinator.toString())) {
if (ServerState.getInstance().getIsFastBully()) {
return new FastBullySetCoordinatorMessageHandler(jsonMessage, connection);
}
return new SetCoordinatorHandler(jsonMessage, connection);
}

if (type.equalsIgnoreCase(Protocol.viewelection.toString())){
if (type.equalsIgnoreCase(Protocol.viewelection.toString())) {
return new FastBullyViewMessageHandler(jsonMessage, connection);
}

if (type.equalsIgnoreCase(Protocol.nominationelection.toString())){
if (type.equalsIgnoreCase(Protocol.nominationelection.toString())) {
return new FastBullyNominationMessageHandler(jsonMessage, connection);
}

if(type.equalsIgnoreCase(Protocol.iamup.toString())){
if (type.equalsIgnoreCase(Protocol.iamup.toString())) {
return new FastBullyIAmUpMessageHandler(jsonMessage, connection);
}

if (type.equalsIgnoreCase(Protocol.gossip.toString())){
if (type.equalsIgnoreCase(Protocol.gossip.toString())) {
return new GossipProtocolHandler(jsonMessage, connection);
}

if (type.equalsIgnoreCase(Protocol.startvote.toString())){
if (type.equalsIgnoreCase(Protocol.startvote.toString())) {
return new StartVoteMessageHandler(jsonMessage, connection);
}

if (type.equalsIgnoreCase(Protocol.answervote.toString())){
if (type.equalsIgnoreCase(Protocol.answervote.toString())) {
return new AnswerVoteMessageHandler(jsonMessage, connection);
}


return new BlackHoleHandler();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import strike.common.model.Protocol;
import strike.handler.IProtocolHandler;
import strike.handler.management.ManagementHandler;
import strike.handler.management.election.StartElectionMessageHandler;
import strike.model.Lingo;

public class AnswerVoteMessageHandler extends ManagementHandler implements IProtocolHandler {

Expand All @@ -17,24 +17,23 @@ public AnswerVoteMessageHandler(JSONObject jsonMessage, Runnable connection) {
@Override
public void handle() {

String suspectServerId = (String) jsonMessage.get(Protocol.suspectserverid.toString());
String votedBy = (String) jsonMessage.get(Protocol.votedby.toString());
String vote = (String) jsonMessage.get(Protocol.vote.toString());
Integer voteCount = serverState.getVoteSet().get(vote);

logger.debug("Receiving voting message: " + jsonMessage);
Lingo.Consensus C = Lingo.Consensus.valueOf(vote);
Integer voteCount = serverState.getVoteSet().get(C);

logger.debug(String.format("Receiving voting to kick [%s]: [%s] voted by server: [%s]",
suspectServerId, vote, votedBy));

if (voteCount == null) {
serverState.getVoteSet().put(vote, 1);
serverState.getVoteSet().put(C, 1);
} else {
serverState.getVoteSet().put(vote, voteCount + 1);
serverState.getVoteSet().put(C, voteCount + 1);
}

// for (String vote : serverState.getVoteSet().keySet()) {
// Integer voteCount = serverState.getVoteSet().get(vote);
//
// }

}

private static final Logger logger = LogManager.getLogger(StartElectionMessageHandler.class);
private static final Logger logger = LogManager.getLogger(AnswerVoteMessageHandler.class);
}

Loading

0 comments on commit 1999d26

Please sign in to comment.